在 Lambda 中处理 SQS 事件源错误
为了处理与 SQS 事件源相关的错误,Lambda 会自动使用带有回退策略的重试策略。您还可以通过配置 SQS 事件源映射来返回部分批次响应,从而自定义错误处理行为。
        失败调用的回退策略
        如果调用失败,Lambda 会在实施回退策略时尝试重试调用。回退策略略有不同,具体取决于 Lambda 的故障原因是函数代码中的错误还是节流所致。
        
             
             
        - 
                 如果您的函数代码导致了该错误,Lambda 将停止处理并重试调用。同时,Lambda 会逐渐退出,以减少分配给 Amazon SQS 事件源映射的并发量。队列的可见性超时结束后,消息将再次显示在队列中。 
- 
                如果调用失败是节流造成的,Lambda 会通过减少分配给 Amazon SQS 事件源映射的并发量来逐渐停止重试。Lambda 会继续重试该消息,直到消息的时间戳超过队列的可见性超时,此时 Lambda 会删除该消息。 
 
     
        实施部分批处理响应
        预设情况下,如果您的 Lambda 函数在处理某个批处理时遇到错误,则该批处理中的所有消息都会在队列中重新可见,包括 Lambda 已经成功处理的消息。因此,您的函数最终可能会多次处理同一消息。
        对于处理失败的批处理,要避免重新处理其中已经成功处理的消息,您可以将事件源映射配置为仅使失败的消息重新可见。这称为部分批处理响应。要开启部分批处理响应,请在配置事件源映射时为 FunctionResponseTypes 操作指定 ReportBatchItemFailures。这可以让您的函数返回部分成功,从而有助于减少对记录进行不必要的重试次数。
        Powertools for AWS Lambda 中的批处理实用程序会自动处理所有部分批处理响应逻辑。此实用程序简化了批处理模式的实现,并减少了正确处理批处理项目失败所需的自定义代码。它适用于 Python、Java、Typescript 和 .NET。
激活 ReportBatchItemFailures 后,当函数调用失败时,Lambda 不会 缩减消息轮询范围。如果您预计某些消息会失败,并且不希望这些失败影响消息处理速率,请使用 ReportBatchItemFailures。
        
        激活部分批处理报告
- 
                查看 实施部分批处理响应的最佳实践。 
- 
                运行以下命令来为您的函数激活 ReportBatchItemFailures。要检索事件源映射的 UUID,请运行 list-event-source-mappings AWS CLI 命令。
 aws lambda update-event-source-mapping \
--uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE"\
--function-response-types"ReportBatchItemFailures"
 
- 
                更新您的函数代码以捕获所有异常并在 batchItemFailuresJSON 响应中返回处理失败的消息。batchItemFailures响应必须包含消息 ID 列表,以作为itemIdentifierJSON 值。
 例如,假设一个批处理有五条消息,消息 ID 分别为 id1、id2、id3、id4和id5。您的函数成功处理了id1、id3和id5。要使消息id2和id4在队列中重新可见,您的函数应运行以下响应:
 { 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "id2"
        },
        {
            "itemIdentifier": "id4"
        }
    ]
}
 以下函数代码示例将返回批处理中处理失败消息的 ID 列表: 
    - .NET
- 
            
     
        - 适用于 .NET 的 SDK
- 
 查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。 
报告使用 .NET 进行 Lambda SQS 批处理项目失败。 // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace sqsSample;
public class Function
{
    public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context)
    {
        List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>();
        foreach(var message in evnt.Records)
        {
            try
            {
                //process your message
                await ProcessMessageAsync(message, context);
            }
            catch (System.Exception)
            {
                //Add failed message identifier to the batchItemFailures list
                batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); 
            }
        }
        return new SQSBatchResponse(batchItemFailures);
    }
    private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context)
    {
        if (String.IsNullOrEmpty(message.Body))
        {
            throw new Exception("No Body in SQS Message.");
        }
        context.Logger.LogInformation($"Processed message {message.Body}");
        // TODO: Do interesting work based on the new message
        await Task.CompletedTask;
    }
}
 
 
 
- Go
- 
            
     
        - SDK for Go V2
- 
 查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。 
报告使用 Go 进行 Lambda SQS 批处理项目失败。 // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
	"context"
	"fmt"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) {
	batchItemFailures := []map[string]interface{}{}
	for _, message := range sqsEvent.Records {
		if len(message.Body) > 0 {
			// Your message processing condition here
			fmt.Printf("Successfully processed message: %s\n", message.Body)
		} else {
			// Message processing failed
			fmt.Printf("Failed to process message %s\n", message.MessageId)
			batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId})
		}
	}
	sqsBatchResponse := map[string]interface{}{
		"batchItemFailures": batchItemFailures,
	}
	return sqsBatchResponse, nil
}
func main() {
	lambda.Start(handler)
}
 
 
 
- Java
- 
            
     
        - 适用于 Java 的 SDK 2.x
- 
 查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。 
报告使用 Java 进行 Lambda SQS 批处理项目失败。 // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
 
import java.util.ArrayList;
import java.util.List;
 
public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> {
    @Override
    public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
         List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>();
         for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) {
             try {
                 //process your message
             } catch (Exception e) {
                 //Add failed message identifier to the batchItemFailures list
                 batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(message.getMessageId()));
             }
         }
         return new SQSBatchResponse(batchItemFailures);
     }
}
 
 
 
- JavaScript
- 
            
     
        - 适用于 JavaScript 的 SDK(v3)
- 
 查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。 
报告使用 JavaScript 进行 Lambda SQS 批处理项目失败。 // Node.js 20.x Lambda runtime, AWS SDK for Javascript V3
export const handler = async (event, context) => {
    const batchItemFailures = [];
    for (const record of event.Records) {
        try {
            await processMessageAsync(record, context);
        } catch (error) {
            batchItemFailures.push({ itemIdentifier: record.messageId });
        }
    }
    return { batchItemFailures };
};
async function processMessageAsync(record, context) {
    if (record.body && record.body.includes("error")) {
        throw new Error("There is an error in the SQS Message.");
    }
    console.log(`Processed message: ${record.body}`);
}
 报告使用 TypeScript 进行 Lambda SQS 批处理项目失败。 // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda';
export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => {
    const batchItemFailures: SQSBatchItemFailure[] = [];
    for (const record of event.Records) {
        try {
            await processMessageAsync(record);
        } catch (error) {
            batchItemFailures.push({ itemIdentifier: record.messageId });
        }
    }
    return {batchItemFailures: batchItemFailures};
};
async function processMessageAsync(record: SQSRecord): Promise<void> {
    if (record.body && record.body.includes("error")) {
        throw new Error('There is an error in the SQS Message.');
    }
    console.log(`Processed message ${record.body}`);
}
 
 
 
- PHP
- 
            
     
        - 适用于 PHP 的 SDK
- 
 查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。 
报告使用 PHP 进行 Lambda SQS 批处理项目失败。 // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
use Bref\Context\Context;
use Bref\Event\Sqs\SqsEvent;
use Bref\Event\Sqs\SqsHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends SqsHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }
    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleSqs(SqsEvent $event, Context $context): void
    {
        $this->logger->info("Processing SQS records");
        $records = $event->getRecords();
        foreach ($records as $record) {
            try {
                // Assuming the SQS message is in JSON format
                $message = json_decode($record->getBody(), true);
                $this->logger->info(json_encode($message));
                // TODO: Implement your custom processing logic here
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $this->markAsFailed($record);
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords SQS records");
    }
}
$logger = new StderrLogger();
return new Handler($logger);
 
 
 
- Python
- 
            
     
        - 适用于 Python 的 SDK (Boto3)
- 
 查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。 
报告使用 Python 进行 Lambda SQS 批处理项目失败。 # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def lambda_handler(event, context):
    if event:
        batch_item_failures = []
        sqs_batch_response = {}
     
        for record in event["Records"]:
            try:
                print(f"Processed message: {record['body']}")
            except Exception as e:
                batch_item_failures.append({"itemIdentifier": record['messageId']})
        
        sqs_batch_response["batchItemFailures"] = batch_item_failures
        return sqs_batch_response
 
 
 
- Ruby
- 
            
     
        - 适用于 Ruby 的 SDK
- 
 查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。 
报告使用 Ruby 进行 Lambda SQS 批处理项目失败。 # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'json'
def lambda_handler(event:, context:)
  if event
    batch_item_failures = []
    sqs_batch_response = {}
    event["Records"].each do |record|
      begin
        # process message
      rescue StandardError => e
        batch_item_failures << {"itemIdentifier" => record['messageId']}
      end
    end
    sqs_batch_response["batchItemFailures"] = batch_item_failures
    return sqs_batch_response
  end
end
 
 
 
- Rust
- 
            
     
        - 适用于 Rust 的 SDK
- 
 查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。 
报告使用 Rust 进行 Lambda SQS 批处理项目失败。 // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
    event::sqs::{SqsBatchResponse, SqsEvent},
    sqs::{BatchItemFailure, SqsMessage},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn process_record(_: &SqsMessage) -> Result<(), Error> {
    Err(Error::from("Error processing message"))
}
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> {
    let mut batch_item_failures = Vec::new();
    for record in event.payload.records {
        match process_record(&record).await {
            Ok(_) => (),
            Err(_) => batch_item_failures.push(BatchItemFailure {
                item_identifier: record.message_id.unwrap(),
            }),
        }
    }
    Ok(SqsBatchResponse {
        batch_item_failures,
    })
}
#[tokio::main]
async fn main() -> Result<(), Error> {
    run(service_fn(function_handler)).await
}
 
 
 
 
如果处理失败的事件没有返回到队列,请参阅 AWS 知识中心中的 如何排查 Lambda 函数 SQS ReportBatchItemFailures 问题?。
         
            成功和失败的条件
            如果您的函数返回以下任意一项,则 Lambda 会将批处理视为完全成功:
            
            如果您的函数返回以下任意一项,则 Lambda 会将批处理视为完全失败:
            
         
         
            CloudWatch 指标
            要确定函数是否在正确报告批处理项目失败情况,您可以监控 Amazon CloudWatch 中的 NumberOfMessagesDeleted 和 ApproximateAgeOfOldestMessage Amazon SQS 指标。
            
         
        
         
            
            Powertools for AWS Lambda 中的批处理器实用程序会自动处理部分批处理响应逻辑,从而降低实施批处理故障报告的复杂性。下面是使用批处理器的示例:
            
            
                 
                 
            
                    - Python
- 
                        
                         
                            使用 AWS Lambda 批处理器处理 Amazon SQS 消息。 import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import SQSEvent
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = BatchProcessor(event_type=EventType.SQS)
logger = Logger()
def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
 
- TypeScript
- 
                        
                         
                            使用 AWS Lambda 批处理器处理 Amazon SQS 消息。 import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { SQSEvent, Context } from 'aws-lambda';
const processor = new BatchProcessor(EventType.SQS);
const logger = new Logger();
const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};
export const handler = async (event: SQSEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};