Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Gestione degli errori per un'origine eventi SQS in Lambda
Per gestire gli errori relativi a un'origine eventi SQS, Lambda utilizza automaticamente una strategia di riprova con una strategia di backoff. Puoi anche personalizzare il comportamento di gestione degli errori configurando lo strumento di mappatura dell'origine degli eventi SQS per restituire risposte parziali in batch.
Strategia di backoff per le chiamate non riuscite
Quando una chiamata non riesce, Lambda riprovare la chiamata mentre implementa una strategia di backoff. La strategia di backoff varia leggermente a seconda che Lambda abbia riscontrato l'errore a causa di un errore nel codice della funzione o a causa di una limitazione.
-
Se il codice della funzione ha causato l'errore, Lambda interromperà l'elaborazione e riproverà l'invocazione. Allo stesso tempo, Lambda si interrompe gradualmente, riducendo la quantità di simultaneità allocata allo strumento di mappatura dell'origine degli eventi di Amazon SQS. Una volta scaduto il timeout di visibilità della coda, il messaggio riapparirà nuovamente nella coda.
-
Se la chiamata non riesce a causa della limitazione, Lambda interrompe gradualmente i nuovi tentativi riducendo la quantità di simultaneità allocata allo strumento di mappatura dell'origine degli eventi di Amazon SQS. Lambda continua a riprovare il messaggio fino a quando il timestamp del messaggio non supera il timeout di visibilità della coda e a quel punto Lambda elimina il messaggio.
Implementazione di risposte batch parziali
Per impostazione predefinita, se la funzione rileva un errore durante l'elaborazione di un batch, tutti i messaggi in quel batch diventano nuovamente visibili nella coda, inclusi i messaggi che Lambda ha elaborato correttamente. Di conseguenza, la tua funzione potrebbe elaborare lo stesso messaggio più volte.
Per evitare di rielaborare tutti i messaggi correttamente elaborati in un batch con errori, puoi configurare lo strumento di mappatura dell'origine degli eventi in modo da rendere nuovamente visibili solo i messaggi con errori. Questa operazione è nota come risposta batch parziale. Per attivare le risposte parziali in batch, specifica l'FunctionResponseTypesazione da eseguire durante ReportBatchItemFailures la configurazione della mappatura delle sorgenti degli eventi. Ciò consente alla funzione di restituire un completamento parziale, riducendo così il numero di tentativi non necessari sui registri.
L'utilità Batch Processor di Powertools AWS Lambda gestisce automaticamente tutta la logica di risposta batch parziale. Questa utilità semplifica l'implementazione dei modelli di elaborazione in batch e riduce il codice personalizzato necessario per gestire correttamente gli errori degli articoli in batch. È disponibile per Python, Java, Typescript e.NET.
Se ReportBatchItemFailures è attivata, Lambda non riduce il polling dei messaggi quando le invocazioni delle funzioni hanno esito negativo. Se prevedi che alcuni messaggi falliscano e non desideri che tali errori influiscano sulla velocità di elaborazione dei messaggi, utilizza ReportBatchItemFailures.
Quando utilizzi le risposte batch, tieni presente quanto segue:
-
Se la funzione restituisce un'eccezione, l'intero batch viene considerato completamente non riuscito.
-
Se si utilizza questa caratteristica con una coda FIFO, la funzione dovrebbe interrompere l'elaborazione dei messaggi dopo il primo errore e restituire tutti i messaggi di errore e non elaborati in batchItemFailures. Ciò aiuta a preservare l'ordine dei messaggi nella coda.
Attivazione di report batch parziali
-
Rivedi le best practice per l'implementazione di risposte batch parziali.
-
Per attivare ReportBatchItemFailures per la tua funzione, esegui il comando riportato di seguito. Per recuperare l'UUID della mappatura della sorgente degli eventi, esegui il comando. list-event-source-mappings AWS CLI
aws lambda update-event-source-mapping \
--uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
--function-response-types "ReportBatchItemFailures"
-
Aggiorna il codice della funzione per rilevare tutte le eccezioni e restituire i messaggi con errori in una risposta batchItemFailures in JSON. La batchItemFailures risposta deve includere un elenco di messaggi IDs, sotto forma di valori JSON. itemIdentifier
Ad esempio, supponiamo di avere un batch di cinque messaggi, IDs id1, id2 id3id4, e. id5 La tua funzione elabora correttamente id1, id3, e id5. Per rendere i messaggi id2 e id4 di nuovo visibili nella coda, la funzione dovrebbe restituire la seguente risposta:
{
"batchItemFailures": [
{
"itemIdentifier": "id2"
},
{
"itemIdentifier": "id4"
}
]
}
Di seguito sono riportati alcuni esempi di codice di funzione che restituiscono l'elenco dei messaggi non riusciti IDs nel batch:
- .NET
-
- SDK per .NET
-
C'è di più su GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di SQS con Lambda tramite .NET.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace sqsSample;
public class Function
{
public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context)
{
List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>();
foreach(var message in evnt.Records)
{
try
{
//process your message
await ProcessMessageAsync(message, context);
}
catch (System.Exception)
{
//Add failed message identifier to the batchItemFailures list
batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId});
}
}
return new SQSBatchResponse(batchItemFailures);
}
private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context)
{
if (String.IsNullOrEmpty(message.Body))
{
throw new Exception("No Body in SQS Message.");
}
context.Logger.LogInformation($"Processed message {message.Body}");
// TODO: Do interesting work based on the new message
await Task.CompletedTask;
}
}
- Go
-
- SDK per Go V2
-
C'è dell'altro GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di SQS con Lambda tramite Go.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) {
batchItemFailures := []map[string]interface{}{}
for _, message := range sqsEvent.Records {
if len(message.Body) > 0 {
// Your message processing condition here
fmt.Printf("Successfully processed message: %s\n", message.Body)
} else {
// Message processing failed
fmt.Printf("Failed to process message %s\n", message.MessageId)
batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId})
}
}
sqsBatchResponse := map[string]interface{}{
"batchItemFailures": batchItemFailures,
}
return sqsBatchResponse, nil
}
func main() {
lambda.Start(handler)
}
- Java
-
- SDK per Java 2.x
-
C'è dell'altro GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di SQS con Lambda tramite Java.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
import java.util.ArrayList;
import java.util.List;
public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> {
@Override
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>();
for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) {
try {
//process your message
} catch (Exception e) {
//Add failed message identifier to the batchItemFailures list
batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(message.getMessageId()));
}
}
return new SQSBatchResponse(batchItemFailures);
}
}
- JavaScript
-
- SDK per JavaScript (v3)
-
C'è altro da fare. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione degli errori degli elementi batch SQS con Lambda using. JavaScript
// Node.js 20.x Lambda runtime, AWS SDK for Javascript V3
export const handler = async (event, context) => {
const batchItemFailures = [];
for (const record of event.Records) {
try {
await processMessageAsync(record, context);
} catch (error) {
batchItemFailures.push({ itemIdentifier: record.messageId });
}
}
return { batchItemFailures };
};
async function processMessageAsync(record, context) {
if (record.body && record.body.includes("error")) {
throw new Error("There is an error in the SQS Message.");
}
console.log(`Processed message: ${record.body}`);
}
Segnalazione degli errori degli elementi batch SQS con Lambda using. TypeScript
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda';
export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => {
const batchItemFailures: SQSBatchItemFailure[] = [];
for (const record of event.Records) {
try {
await processMessageAsync(record);
} catch (error) {
batchItemFailures.push({ itemIdentifier: record.messageId });
}
}
return {batchItemFailures: batchItemFailures};
};
async function processMessageAsync(record: SQSRecord): Promise<void> {
if (record.body && record.body.includes("error")) {
throw new Error('There is an error in the SQS Message.');
}
console.log(`Processed message ${record.body}`);
}
- PHP
-
- SDK per PHP
-
C'è altro da fare. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di SQS con Lambda tramite PHP.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
use Bref\Context\Context;
use Bref\Event\Sqs\SqsEvent;
use Bref\Event\Sqs\SqsHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends SqsHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handleSqs(SqsEvent $event, Context $context): void
{
$this->logger->info("Processing SQS records");
$records = $event->getRecords();
foreach ($records as $record) {
try {
// Assuming the SQS message is in JSON format
$message = json_decode($record->getBody(), true);
$this->logger->info(json_encode($message));
// TODO: Implement your custom processing logic here
} catch (Exception $e) {
$this->logger->error($e->getMessage());
// failed processing the record
$this->markAsFailed($record);
}
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords SQS records");
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK per Python (Boto3)
-
C'è dell'altro GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di SQS con Lambda tramite Python.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def lambda_handler(event, context):
if event:
batch_item_failures = []
sqs_batch_response = {}
for record in event["Records"]:
try:
print(f"Processed message: {record['body']}")
except Exception as e:
batch_item_failures.append({"itemIdentifier": record['messageId']})
sqs_batch_response["batchItemFailures"] = batch_item_failures
return sqs_batch_response
- Ruby
-
- SDK per Ruby
-
C'è dell'altro GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di SQS con Lambda tramite Ruby.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'json'
def lambda_handler(event:, context:)
if event
batch_item_failures = []
sqs_batch_response = {}
event["Records"].each do |record|
begin
# process message
rescue StandardError => e
batch_item_failures << {"itemIdentifier" => record['messageId']}
end
end
sqs_batch_response["batchItemFailures"] = batch_item_failures
return sqs_batch_response
end
end
- Rust
-
- SDK per Rust
-
C'è dell'altro GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di SQS con Lambda tramite Rust.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
event::sqs::{SqsBatchResponse, SqsEvent},
sqs::{BatchItemFailure, SqsMessage},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn process_record(_: &SqsMessage) -> Result<(), Error> {
Err(Error::from("Error processing message"))
}
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> {
let mut batch_item_failures = Vec::new();
for record in event.payload.records {
match process_record(&record).await {
Ok(_) => (),
Err(_) => batch_item_failures.push(BatchItemFailure {
item_identifier: record.message_id.unwrap(),
}),
}
}
Ok(SqsBatchResponse {
batch_item_failures,
})
}
#[tokio::main]
async fn main() -> Result<(), Error> {
run(service_fn(function_handler)).await
}
Se gli eventi non riusciti non tornano in coda, vedi Come posso risolvere i problemi della funzione Lambda SQS? ReportBatchItemFailures nel Knowledge Center. AWS
Condizioni di successo e di errore
Lambda considera un batch completamente riuscito se la funzione restituisce uno dei seguenti elementi:
-
Una batchItemFailures lista vuota
-
Un batchItemFailures elenco nullo
-
Un vuoto EventResponse
-
Un valore nullo EventResponse
Lambda considera un batch completamente non riuscito se la funzione restituisce uno dei seguenti elementi:
-
Risposta JSON non valida
-
Una stringa vuota itemIdentifier
-
Un valore nullo itemIdentifier
-
Un itemIdentifier con un nome chiave errato
-
Un itemIdentifier valore di con un ID messaggio inesistente
CloudWatch metriche
Per determinare se la tua funzione riporta correttamente gli errori degli articoli in batch, puoi monitorare i parametri NumberOfMessagesDeleted e ApproximateAgeOfOldestMessage Amazon SQS in Amazon. CloudWatch
-
NumberOfMessagesDeleted tiene traccia del numero di messaggi rimossi dalla coda. Se questo scende a 0, significa che la risposta alla funzione non sta restituendo correttamente i messaggi non riusciti.
-
ApproximateAgeOfOldestMessage tiene traccia del tempo di permanenza del messaggio meno recente all'interno della coda. Un incremento considerevole di questo parametro può indicare che la funzione non sta restituendo correttamente i messaggi non riusciti.
L'utilità per il processore batch di Powertools gestisce AWS Lambda automaticamente la logica di risposta parziale in batch, riducendo la complessità dell'implementazione della segnalazione degli errori dei batch. Ecco alcuni esempi di utilizzo del processore batch:
- Python
-
Elaborazione di messaggi Amazon SQS con processore AWS Lambda batch.
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import SQSEvent
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = BatchProcessor(event_type=EventType.SQS)
logger = Logger()
def record_handler(record):
logger.info(record)
# Your business logic here
# Raise an exception to mark this record as failed
def lambda_handler(event, context: LambdaContext):
return process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
context=context
)
- TypeScript
-
Elaborazione di messaggi Amazon SQS con processore AWS Lambda batch.
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { SQSEvent, Context } from 'aws-lambda';
const processor = new BatchProcessor(EventType.SQS);
const logger = new Logger();
const recordHandler = async (record: any): Promise<void> => {
logger.info('Processing record', { record });
// Your business logic here
// Throw an error to mark this record as failed
};
export const handler = async (event: SQSEvent, context: Context) => {
return processPartialResponse(event, recordHandler, processor, {
context,
});
};