

# 在 Lambda 中处理 SQS 事件源错误
<a name="services-sqs-errorhandling"></a>

为了处理与 SQS 事件源相关的错误，Lambda 会自动使用带有回退策略的重试策略。您还可以通过配置 SQS 事件源映射来返回[部分批次响应](#services-sqs-batchfailurereporting)，从而自定义错误处理行为。

## 失败调用的回退策略
<a name="services-sqs-backoff-strategy"></a>

如果调用失败，Lambda 会在实施回退策略时尝试重试调用。回退策略略有不同，具体取决于 Lambda 的故障原因是函数代码中的错误还是节流所致。
+  如果您的**函数代码**导致了该错误，Lambda 将停止处理并重试调用。同时，Lambda 会逐渐退出，以减少分配给 Amazon SQS 事件源映射的并发量。队列的可见性超时结束后，消息将再次显示在队列中。
+ 如果调用失败是**节流**造成的，Lambda 会通过减少分配给 Amazon SQS 事件源映射的并发量来逐渐停止重试。Lambda 会继续重试该消息，直到消息的时间戳超过队列的可见性超时，此时 Lambda 会删除该消息。

## 实施部分批处理响应
<a name="services-sqs-batchfailurereporting"></a>

预设情况下,如果您的 Lambda 函数在处理某个批处理时遇到错误，则该批处理中的所有消息都会在队列中重新可见，包括 Lambda 已经成功处理的消息。因此，您的函数最终可能会多次处理同一消息。

对于处理失败的批处理，要避免重新处理其中已经成功处理的消息，您可以将事件源映射配置为仅使失败的消息重新可见。这称为部分批处理响应。要开启部分批处理响应，请在配置事件源映射时为 [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html#lambda-UpdateEventSourceMapping-request-FunctionResponseTypes) 操作指定 `ReportBatchItemFailures`。这可以让您的函数返回部分成功，从而有助于减少对记录进行不必要的重试次数。

**注意**  
Powertools for AWS Lambda 中的[批处理实用程序](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)会自动处理所有部分批处理响应逻辑。此实用程序简化了批处理模式的实现，并减少了正确处理批处理项目失败所需的自定义代码。它适用于 Python、Java、Typescript 和 .NET。

激活 `ReportBatchItemFailures` 后，当函数调用失败时，Lambda 不会 [缩减消息轮询范围](#services-sqs-backoff-strategy)。如果您预计某些消息会失败，并且不希望这些失败影响消息处理速率，请使用 `ReportBatchItemFailures`。

**注意**  
在使用部分批处理响应时，请记住以下几点：  
如果您函数发现了一个异常，则整个批处理将被视为完全失败。
如果您将此功能与 FIFO 队列结合使用，则您的函数应在第一次失败后停止处理消息，并返回 `batchItemFailures` 中的所有失败和未处理的消息。这有助于保持队列中消息的顺序。

**激活部分批处理报告**

1. 查看 [实施部分批处理响应的最佳实践](https://docs.aws.amazon.com/prescriptive-guidance/latest/lambda-event-filtering-partial-batch-responses-for-sqs/best-practices-partial-batch-responses.html)。

1. 运行以下命令来为您的函数激活 `ReportBatchItemFailures`。要检索事件源映射的 UUID，请运行 [list-event-source-mappings](https://docs.aws.amazon.com/cli/latest/reference/lambda/list-event-source-mappings.html) AWS CLI 命令。

   ```
   aws lambda update-event-source-mapping \
   --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
   --function-response-types "ReportBatchItemFailures"
   ```

1. 更新您的函数代码以捕获所有异常并在 `batchItemFailures` JSON 响应中返回处理失败的消息。`batchItemFailures` 响应必须包含消息 ID 列表，以作为 `itemIdentifier` JSON 值。

   例如，假设一个批处理有五条消息，消息 ID 分别为 `id1`、`id2`、`id3`、`id4` 和 `id5`。您的函数成功处理了 `id1`、`id3` 和 `id5`。要使消息 `id2` 和 `id4` 在队列中重新可见，您的函数应运行以下响应：

   ```
   { 
     "batchItemFailures": [ 
           {
               "itemIdentifier": "id2"
           },
           {
               "itemIdentifier": "id4"
           }
       ]
   }
   ```

   以下函数代码示例将返回批处理中处理失败消息的 ID 列表：

------
#### [ .NET ]

**适用于 .NET 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 .NET 进行 Lambda SQS 批处理项目失败。  

   ```
   // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   // SPDX-License-Identifier: Apache-2.0
   using Amazon.Lambda.Core;
   using Amazon.Lambda.SQSEvents;
   
   // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
   [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
   namespace sqsSample;
   
   public class Function
   {
       public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context)
       {
           List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>();
           foreach(var message in evnt.Records)
           {
               try
               {
                   //process your message
                   await ProcessMessageAsync(message, context);
               }
               catch (System.Exception)
               {
                   //Add failed message identifier to the batchItemFailures list
                   batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); 
               }
           }
           return new SQSBatchResponse(batchItemFailures);
       }
   
       private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context)
       {
           if (String.IsNullOrEmpty(message.Body))
           {
               throw new Exception("No Body in SQS Message.");
           }
           context.Logger.LogInformation($"Processed message {message.Body}");
           // TODO: Do interesting work based on the new message
           await Task.CompletedTask;
       }
   }
   ```

------
#### [ Go ]

**适用于 Go 的 SDK V2**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Go 进行 Lambda SQS 批处理项目失败。  

   ```
   // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   // SPDX-License-Identifier: Apache-2.0
   package main
   
   import (
   	"context"
   	"fmt"
   	"github.com/aws/aws-lambda-go/events"
   	"github.com/aws/aws-lambda-go/lambda"
   )
   
   func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) {
   	batchItemFailures := []map[string]interface{}{}
   
   	for _, message := range sqsEvent.Records {
   		if len(message.Body) > 0 {
   			// Your message processing condition here
   			fmt.Printf("Successfully processed message: %s\n", message.Body)
   		} else {
   			// Message processing failed
   			fmt.Printf("Failed to process message %s\n", message.MessageId)
   			batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId})
   		}
   	}
   
   	sqsBatchResponse := map[string]interface{}{
   		"batchItemFailures": batchItemFailures,
   	}
   	return sqsBatchResponse, nil
   }
   
   func main() {
   	lambda.Start(handler)
   }
   ```

------
#### [ Java ]

**适用于 Java 的 SDK 2.x**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Java 进行 Lambda SQS 批处理项目失败。  

   ```
   // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   // SPDX-License-Identifier: Apache-2.0
   import com.amazonaws.services.lambda.runtime.Context;
   import com.amazonaws.services.lambda.runtime.RequestHandler;
   import com.amazonaws.services.lambda.runtime.events.SQSEvent;
   import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
    
   import java.util.ArrayList;
   import java.util.List;
    
   public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> {
       @Override
       public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
            List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>();
   
            for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) {
                try {
                    //process your message
                } catch (Exception e) {
                    //Add failed message identifier to the batchItemFailures list
                    batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(message.getMessageId()));
                }
            }
            return new SQSBatchResponse(batchItemFailures);
        }
   }
   ```

------
#### [ JavaScript ]

**SDK for JavaScript（v3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 JavaScript 进行 Lambda SQS 批处理项目失败。  

   ```
   // Node.js 20.x Lambda runtime, AWS SDK for Javascript V3
   export const handler = async (event, context) => {
       const batchItemFailures = [];
       for (const record of event.Records) {
           try {
               await processMessageAsync(record, context);
           } catch (error) {
               batchItemFailures.push({ itemIdentifier: record.messageId });
           }
       }
       return { batchItemFailures };
   };
   
   async function processMessageAsync(record, context) {
       if (record.body && record.body.includes("error")) {
           throw new Error("There is an error in the SQS Message.");
       }
       console.log(`Processed message: ${record.body}`);
   }
   ```
报告使用 TypeScript 进行 Lambda SQS 批处理项目失败。  

   ```
   // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   // SPDX-License-Identifier: Apache-2.0
   import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda';
   
   export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => {
       const batchItemFailures: SQSBatchItemFailure[] = [];
   
       for (const record of event.Records) {
           try {
               await processMessageAsync(record);
           } catch (error) {
               batchItemFailures.push({ itemIdentifier: record.messageId });
           }
       }
   
       return {batchItemFailures: batchItemFailures};
   };
   
   async function processMessageAsync(record: SQSRecord): Promise<void> {
       if (record.body && record.body.includes("error")) {
           throw new Error('There is an error in the SQS Message.');
       }
       console.log(`Processed message ${record.body}`);
   }
   ```

------
#### [ PHP ]

**适用于 PHP 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 PHP 进行 Lambda SQS 批处理项目失败。  

   ```
   // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   // SPDX-License-Identifier: Apache-2.0
   <?php
   
   use Bref\Context\Context;
   use Bref\Event\Sqs\SqsEvent;
   use Bref\Event\Sqs\SqsHandler;
   use Bref\Logger\StderrLogger;
   
   require __DIR__ . '/vendor/autoload.php';
   
   class Handler extends SqsHandler
   {
       private StderrLogger $logger;
       public function __construct(StderrLogger $logger)
       {
           $this->logger = $logger;
       }
   
       /**
        * @throws JsonException
        * @throws \Bref\Event\InvalidLambdaEvent
        */
       public function handleSqs(SqsEvent $event, Context $context): void
       {
           $this->logger->info("Processing SQS records");
           $records = $event->getRecords();
   
           foreach ($records as $record) {
               try {
                   // Assuming the SQS message is in JSON format
                   $message = json_decode($record->getBody(), true);
                   $this->logger->info(json_encode($message));
                   // TODO: Implement your custom processing logic here
               } catch (Exception $e) {
                   $this->logger->error($e->getMessage());
                   // failed processing the record
                   $this->markAsFailed($record);
               }
           }
           $totalRecords = count($records);
           $this->logger->info("Successfully processed $totalRecords SQS records");
       }
   }
   
   $logger = new StderrLogger();
   return new Handler($logger);
   ```

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Python 进行 Lambda SQS 批处理项目失败。  

   ```
   # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   # SPDX-License-Identifier: Apache-2.0
   
   def lambda_handler(event, context):
       if event:
           batch_item_failures = []
           sqs_batch_response = {}
        
           for record in event["Records"]:
               try:
                   print(f"Processed message: {record['body']}")
               except Exception as e:
                   batch_item_failures.append({"itemIdentifier": record['messageId']})
           
           sqs_batch_response["batchItemFailures"] = batch_item_failures
           return sqs_batch_response
   ```

------
#### [ Ruby ]

**适用于 Ruby 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-sqs-to-lambda-with-batch-item-handling)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Ruby 进行 Lambda SQS 批处理项目失败。  

   ```
   # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   # SPDX-License-Identifier: Apache-2.0
   require 'json'
   
   def lambda_handler(event:, context:)
     if event
       batch_item_failures = []
       sqs_batch_response = {}
   
       event["Records"].each do |record|
         begin
           # process message
         rescue StandardError => e
           batch_item_failures << {"itemIdentifier" => record['messageId']}
         end
       end
   
       sqs_batch_response["batchItemFailures"] = batch_item_failures
       return sqs_batch_response
     end
   end
   ```

------
#### [ Rust ]

**适用于 Rust 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Rust 进行 Lambda SQS 批处理项目失败。  

   ```
   // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   // SPDX-License-Identifier: Apache-2.0
   use aws_lambda_events::{
       event::sqs::{SqsBatchResponse, SqsEvent},
       sqs::{BatchItemFailure, SqsMessage},
   };
   use lambda_runtime::{run, service_fn, Error, LambdaEvent};
   
   async fn process_record(_: &SqsMessage) -> Result<(), Error> {
       Err(Error::from("Error processing message"))
   }
   
   async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> {
       let mut batch_item_failures = Vec::new();
       for record in event.payload.records {
           match process_record(&record).await {
               Ok(_) => (),
               Err(_) => batch_item_failures.push(BatchItemFailure {
                   item_identifier: record.message_id.unwrap(),
               }),
           }
       }
   
       Ok(SqsBatchResponse {
           batch_item_failures,
       })
   }
   
   #[tokio::main]
   async fn main() -> Result<(), Error> {
       run(service_fn(function_handler)).await
   }
   ```

------

如果处理失败的事件没有返回到队列，请参阅 AWS 知识中心中的 [如何排查 Lambda 函数 SQS ReportBatchItemFailures 问题？](https://aws.amazon.com/premiumsupport/knowledge-center/lambda-sqs-report-batch-item-failures/)。

### 成功和失败的条件
<a name="sqs-batchfailurereporting-conditions"></a>

如果您的函数返回以下任意一项，则 Lambda 会将批处理视为完全成功：
+ 空的 `batchItemFailures` 列表
+ Null `batchItemFailures` 列表
+ 空的 `EventResponse`
+ Null `EventResponse`

如果您的函数返回以下任意一项，则 Lambda 会将批处理视为完全失败：
+ JSON 响应无效
+ 空字符串 `itemIdentifier`
+ Null `itemIdentifier`
+ 包含错误密钥名的 `itemIdentifier`
+ 具有某个消息 ID 的 `itemIdentifier` 值不存在

### CloudWatch 指标
<a name="sqs-batchfailurereporting-metrics"></a>

要确定函数是否在正确报告批处理项目失败情况，您可以监控 Amazon CloudWatch 中的 `NumberOfMessagesDeleted` 和 `ApproximateAgeOfOldestMessage` Amazon SQS 指标。
+ `NumberOfMessagesDeleted` 会跟踪从队列中删除的消息数量。如果该值降至 0，则表明您的函数响应没有正确返回失败的消息。
+ `ApproximateAgeOfOldestMessage` 会跟踪最早的消息在队列中停留的时间。如果此指标急剧增加，则可能表明您的函数没有正确返回失败的消息。

### 使用 Powertools for AWS Lambda 批处理器
<a name="services-sqs-batchfailurereporting-powertools"></a>

Powertools for AWS Lambda 中的批处理器实用程序会自动处理部分批处理响应逻辑，从而降低实施批处理故障报告的复杂性。下面是使用批处理器的示例：

**Python**  
有关完整的示例和设置说明，请参阅[批处理器文档](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)。
使用 AWS Lambda 批处理器处理 Amazon SQS 消息。  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import SQSEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.SQS)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
有关完整的示例和设置说明，请参阅[批处理器文档](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/)。
使用 AWS Lambda 批处理器处理 Amazon SQS 消息。  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { SQSEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.SQS);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: SQSEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```