

# 将 Lambda 与 Amazon SQS 结合使用
<a name="with-sqs"></a>

**注意**  
如果想要将数据发送到 Lambda 函数以外的目标，或要在发送数据之前丰富数据，请参阅 [Amazon EventBridge Pipes](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)（Amazon EventBridge 管道）。

您可以使用 Lambda 函数来处理某个 Amazon Simple Queue Service（Amazon SQS）队列中的消息。Lambda 支持[事件源映射](invocation-eventsourcemapping.md)的[标准队列](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html)和[先进先出（FIFO）队列](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html)。您也可以使用预置模式为您的 Amazon SQS 事件源映射分配专用的轮询资源。Lambda 函数和 Amazon SQS 队列必须位于同一 AWS 区域，即便二者可能位于[不同的 AWS 账户](with-sqs-cross-account-example.md)。

在处理 Amazon SQS 消息时，您需要实施部分批处理响应逻辑，以防止在批处理中的某些消息失败时重试成功处理的消息。Powertools for AWS Lambda 中的[批处理器实用程序](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)通过自动处理部分批处理响应逻辑简化了此实现，减少了开发时间并提高了可靠性。

**Topics**
+ [了解 Amazon SQS 事件源映射的轮询和批处理行为](#sqs-polling-behavior)
+ [对 Amazon SQS 事件源映射使用预置模式](#sqs-provisioned-mode)
+ [为 Amazon SQS 事件源映射配置预置模式](#sqs-configuring-provisioned-mode)
+ [示例标准队列消息事件](#example-standard-queue-message-event)
+ [示例 FIFO 队列消息事件](#sample-fifo-queues-message-event)
+ [创建和配置 Amazon SQS 事件源映射](services-sqs-configure.md)
+ [为 SQS 事件源映射配置扩展行为](services-sqs-scaling.md)
+ [在 Lambda 中处理 SQS 事件源错误](services-sqs-errorhandling.md)
+ [Amazon SQS 事件源映射的 Lambda 参数](services-sqs-parameters.md)
+ [对 Amazon SQS 事件源使用事件筛选](with-sqs-filtering.md)
+ [教程：将 Lambda 与 Amazon SQS 结合使用](with-sqs-example.md)
+ [教程：将跨账户 Amazon SQS 队列用作事件源](with-sqs-cross-account-example.md)

## 了解 Amazon SQS 事件源映射的轮询和批处理行为
<a name="sqs-polling-behavior"></a>

使用 Amazon SQS 事件源映射，Lambda 会轮询队列并通过事件[同步](invocation-sync.md)调用您的函数。每个事件可以包含来自队列的一批多条消息。Lambda 每次接收一批这些事件，并为每个批次调用一次您的函数。当您的函数成功处理一个批次后，Lambda 就会将其消息从队列中删除。

当 Lambda 接收某个批次时，消息将保留在队列中，但会根据队列的[可见性超时](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)长度隐藏。如果您的函数成功处理了批次中的所有消息，Lambda 会将消息从队列中删除。预设情况下，如果您的函数在处理某个批处理时遇到错误，则该批处理中的所有消息都会在可见性超时过期后在队列中重新可见。因此，函数代码必须能够多次处理同一条消息，而不会产生意外的副作用。

**警告**  
Lambda 事件源映射至少处理每个事件一次，有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题，我们强烈建议您将函数代码设为幂等性。要了解更多信息，请参阅 AWS 知识中心的[如何让我的 Lambda 函数保持幂等性](https://repost.aws/knowledge-center/lambda-function-idempotent)。

要防止 Lambda 多次处理消息，您可以将事件源映射配置为在函数响应中包含[批次项目失败](services-sqs-errorhandling.md#services-sqs-batchfailurereporting)，也可以在 Lambda 函数成功处理消息后使用 [DeleteMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessage.html) API 将消息从队列中删除。

有关 Lambda 支持的 SQS 事件源映射配置参数的更多信息，请参阅 [创建 SQS 事件源映射](services-sqs-configure.md#events-sqs-eventsource)。

## 对 Amazon SQS 事件源映射使用预置模式
<a name="sqs-provisioned-mode"></a>

对于需要微调事件源映射吞吐量的工作负载，您可以使用预调配模式。在预调配模式下，您可以为预调配事件轮询器数量定义最小和最大限制。这些预调配事件轮询器专用于事件源映射，并且可以通过响应式自动扩缩处理意外的消息激增。配置了预置模式的 Amazon SQS 事件源映射的扩展速度比默认的 Amazon SQS 事件源映射功能快 3 倍（每分钟最多 1000 次并发调用），并且支持高 16 倍的并发性（最多 20000 次并发调用）。我们建议，对于具有严格性能要求的 Amazon SQS 事件驱动型工作负载（例如处理市场数据源的金融服务公司、提供实时个性化推荐的电子商务平台以及管理实时玩家互动的游戏公司等），您应采用预置模式。使用预调配模式会产生额外成本。有关详细定价，请参阅 [AWS Lambda 定价](https://aws.amazon.com/lambda/pricing/)。

每个事件轮询器在预置模式下能处理最高 1 MB/s 的吞吐量、最多 10 次并发调用，或者每秒最多进行 10 次 Amazon SQS 轮询 API 调用。最小事件轮询器数量（MinimumPollers）的可接受值范围介于 2 到 200 之间（默认为 2）。最大事件轮询器数量（MaximumPollers）的可接受值范围介于 2 到 2000 之间（默认为 200）。MaximumPollers 必须大于或等于 MinimumPollers。

### 确定所需的事件轮询器
<a name="sqs-determining-event-pollers"></a>

要估算在使用 SQS ESM 的预置模式下确保最佳消息处理性能所需的事件轮询器数量，请为您的应用程序收集以下指标：每秒需要低延迟处理的峰值 SQS 事件数量、平均 SQS 事件有效载荷大小、平均 Lambda 函数执行持续时间以及配置的批处理大小。

首先，您可以使用以下公式来估算事件轮询器为您的工作负载支持的每秒 SQS 事件数量（EPS）：

```
EPS per event poller = 
        minimum(
            ceiling(1024 / average event size in KB),
            ceiling(10 / average function duration in seconds) * batch size, 
            min(100, 10 * batch size)
                )
```

然后，您可以使用以下公式计算所需的最小轮询器数量。此计算可确保您预置足够的容量来满足峰值流量要求。

```
Required event pollers = (Peak number of events per second in Queue) / EPS per event poller
```

考虑这样一个工作负载，其默认批处理大小为 10，平均事件大小为 3KB，平均函数执行时间为 100 ms，并且需要每秒处理 1000 个事件。在这种情况下，每个事件轮询器每秒将支持大约 100 个事件（EPS）。因此，您应将最小轮询器数量设置为 10，以充分满足您的峰值流量需求。如果您的工作负载具有相同的特性，但函数平均持续时间为 1 秒，则每个轮询器仅能支持 10 个 EPS，这就要求您将最小轮询器数量配置为 100，以在低延迟的情况下每秒支持 1000 个事件。

我们建议使用 10 或更高的默认批处理大小，以最大限度地提高预置模式事件轮询器的效率。更大的批次大小使得每个轮询器在每次调用时能够处理更多的事件，从而提高了吞吐量和成本效益。在规划事件轮询器容量时，请考虑潜在的流量峰值，并考虑将 minimumPollers 值设置为略高于计算得出的最小值，以提供缓冲区。此外，持续监控您的工作负载特征的变化情况，因为消息大小、函数持续时间或流量模式的改变可能会需要对您的事件轮询器配置进行调整，以保持最佳性能和成本效益。为了进行精确的容量规划，我们建议您测试您的特定工作负载，以确定每个事件轮询器可以驱动的实际 EPS。

## 为 Amazon SQS 事件源映射配置预置模式
<a name="sqs-configuring-provisioned-mode"></a>

您可以使用控制台或 Lambda API 为 Amazon SQS 事件源映射配置预置模式。

**为现有的 Amazon SQS 事件源映射配置预置模式（控制台）**

1. 打开 Lamba 控制台的[函数](https://console.aws.amazon.com/lambda/home#/functions)页面。

1. 选择具有要为其配置预置模式的 Amazon SQS 事件源映射的函数。

1. 选择**配置**，然后选择**触发器**。

1. 选择要为其配置预置模式的 Amazon SQS 事件源映射，然后选择**编辑**。

1. 在**事件源映射配置**下，选择**配置预调配模式**。
   + 对于**最少事件轮询器**，输入介于 2 到 200 之间的值。如果未指定值，则 Lambda 将选择默认值 2。
   + 对于**最大事件轮询器**，输入介于 2 到 2000 之间的值。此值必须大于或等于**最少事件轮询器**的值。如果未指定值，则 Lambda 将选择默认值 200。

1. 选择**保存**。

您可以使用 `EventSourceMappingConfiguration` 中的 `ProvisionedPollerConfig` 对象以编程方式配置预置模式。例如，以下 `UpdateEventSourceMapping` CLI 命令将 `MinimumPollers` 值配置为 5，将 `MaximumPollers` 值配置为 100。

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --provisioned-poller-config '{"MinimumPollers": 5, "MaximumPollers": 100}'
```

配置预调配模式后，您可以通过监控 `ProvisionedPollers` 指标来观测事件轮询器对您的工作负载的使用情况。有关更多信息，请参阅事件源映射指标。

要禁用预置模式并返回默认（按需）模式，您可以使用以下 `UpdateEventSourceMapping` CLI 命令：

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --provisioned-poller-config '{}'
```

**注意**  
预置模式不能与最大并发设置结合使用。使用预置模式时，您可以通过事件轮询器的最大数量来控制最大并发性。

有关配置预置模式的更多信息，请参阅[创建和配置 Amazon SQS 事件源映射](services-sqs-configure.md)。

## 示例标准队列消息事件
<a name="example-standard-queue-message-event"></a>

**Example Amazon SQS 消息事件（标准队列）**  

```
{
    "Records": [
        {
            "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
            "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
            "body": "Test message.",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082649183",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082649185"
            },
            "messageAttributes": {
                "myAttribute": {
                    "stringValue": "myValue", 
                    "stringListValues": [], 
                    "binaryListValues": [], 
                    "dataType": "String"
                }
            },
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        },
        {
            "messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
            "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
            "body": "Test message.",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082650636",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082650649"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        }
    ]
}
```

默认情况下，Lambda 将一次性轮询队列中最多 10 条消息，并将该批次发送到函数。为避免在记录数量较少的情况下调用该函数，您可以配置批次时段，将事件源配置为缓冲最多五分钟的记录。在调用函数之前，Lambda 将继续轮询标准队列中的消息，直到批次时段到期、达到[调用有效负载大小配额](gettingstarted-limits.md)或达到配置的最大批次大小为止。

如果您使用的是批处理窗口，并且 SQS 队列包含的流量非常低，Lambda 可能会等待最多 20 秒钟才能调用您的函数。即使您将批处理窗口设置为低于 20 秒，情况依然如此。

**注意**  
在 Java 中，反序列化 JSON 时可能会遇到空指针错误。这可能要归因于 JSON 对象映射器转换“Records”和“eventSourceARN”大小写的方式。

## 示例 FIFO 队列消息事件
<a name="sample-fifo-queues-message-event"></a>

对于 FIFO 队列，记录包含与重复数据消除和顺序相关的其他属性。

**Example Amazon SQS 消息事件（FIFO 队列）**  

```
{
    "Records": [
        {
            "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5",
            "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...",
            "body": "Test message.",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1573251510774",
                "SequenceNumber": "18849496460467696128",
                "MessageGroupId": "1",
                "SenderId": "AIDAIO23YVJENQZJOL4VO",
                "MessageDeduplicationId": "1",
                "ApproximateFirstReceiveTimestamp": "1573251510774"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo",
            "awsRegion": "us-east-2"
        }
    ]
}
```

# 创建和配置 Amazon SQS 事件源映射
<a name="services-sqs-configure"></a>

要使用 Lambda 处理 Amazon SQS 消息，请使用适当的设置配置您的队列，然后创建 Lambda 事件源映射。

**Topics**
+ [配置队列以便用于 Lambda](#events-sqs-queueconfig)
+ [设置 Lambda 执行角色权限](#events-sqs-permissions)
+ [创建 SQS 事件源映射](#events-sqs-eventsource)

## 配置队列以便用于 Lambda
<a name="events-sqs-queueconfig"></a>

如果您没有现有的 Amazon SQS 队列，则[创建一个](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-create-queue.html)队列，用作 Lambda 函数的事件源。Lambda 函数和 Amazon SQS 队列必须位于同一 AWS 区域，即便二者可能位于[不同的 AWS 账户](with-sqs-cross-account-example.md)。

为使函数有时间处理每批记录，请将源队列的[可见性超时](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)设置为函数[配置超时](configuration-timeout.md)的至少六倍。这一额外的时间允许 Lambda 在您的函数处理之前的批次期间遇到限流时进行重试。

**注意**  
函数超时必须小于或等于于队列的可见性超时。当创建或更新事件源映射时，Lambda 会验证此要求，函数超时超过队列的可见性超时则会返回错误。

默认情况下，如果 Lambda 在处理某个批次期间的任何时候遇到错误，则该批次中的所有消息都会返回到队列。[可见性超时](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)后，Lambda 将再次看到这些消息。您可以将事件源映射配置为使用[部分批次响应](services-sqs-errorhandling.md#services-sqs-batchfailurereporting)，以仅使失败的消息返回队列。此外，如果函数多次都未能处理某条消息，则 Amazon SQS 可以将其发送到某个[死信队列](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html)。建议将源队列的[重新驱动策略](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html#policies-for-dead-letter-queues)的 `maxReceiveCount` 设置为至少 5。这让 Lambda 在将失败的消息直接发送到死信队列之前有几次重试的机会。

## 设置 Lambda 执行角色权限
<a name="events-sqs-permissions"></a>

[AWSLambdaSQSQueueExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaSQSQueueExecutionRole.html) AWS 托管策略包含 Lambda 从您的 Amazon SQS 队列中读取所需的权限。您可以[将此托管策略添加](lambda-intro-execution-role.md)到您的函数的执行角色。

或者，如果您使用的是加密队列，则还需要为执行角色添加以下权限：
+ [kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html)

## 创建 SQS 事件源映射
<a name="events-sqs-eventsource"></a>

创建事件源映射以指示 Lambda 将队列中的项目发送到 Lambda 函数。您可以创建多个事件源映射，以使用单个函数处理来自多个队列的项目。当 Lambda 调用目标函数时，事件可以包含多个项目（最多为可配置的最大*批处理大小*）。

要将您的函数配置为从 Amazon SQS 中读取，请将 [AWSLambdaSQSQueueExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaSQSQueueExecutionRole.html) AWS 托管策略附加到您的执行角色。然后，使用以下步骤从控制台创建 **SQS** 事件源映射。

**要添加权限并创建触发器**

1. 打开 Lamba 控制台的[函数](https://console.aws.amazon.com/lambda/home#/functions)页面。

1. 选择一个函数的名称。

1. 选择 **Configuration**（配置）选项卡，然后选择 **Permissions**（权限）。

1. 在**角色名称**下，选择至执行角色的链接。此角色将在 IAM 控制台中打开角色。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/execution-role.png)

1. 选择**添加权限**，然后选择**附加策略**。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/attach-policies.png)

1. 在搜索字段中输入 `AWSLambdaSQSQueueExecutionRole`。向执行角色添加此策略。这是一项 AWS 托管策略，其中包含您的函数从 Amazon SQS 队列中读取所需的权限。有关此策略的更多信息，请参阅《AWS Managed Policy Reference》**中的 [AWSLambdaSQSQueueExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaSQSQueueExecutionRole.html)。

1. 在 Lambda 控制台中返回您的函数。在 **Function overview**（函数概览）下，选择 **Add trigger**（添加触发器）。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/add-trigger.png)

1. 选择触发器类型。

1. 配置必填选项，然后选择 **Add**（添加）。

Lambda 支持 Amazon SQS 事件源的以下配置选项：

**SQS 队列**  
要从中读取记录的 Amazon SQS 队列。Lambda 函数和 Amazon SQS 队列必须位于同一 AWS 区域，即便二者可能位于[不同的 AWS 账户](with-sqs-cross-account-example.md)。

**启用触发器**  
事件源映射的状态。**Enable trigger**（启用触发器）默认处于选中状态。

**批次大小**  
每个批次中要发送给函数的最大记录数。对于标准队列，这最高可为 10,000 条记录。对于 FIFO 队列，最大值为 10。对于超过 10 的批处理大小，还必须将批处理时间（`MaximumBatchingWindowInSeconds`）设置为至少 1 秒。  
配置[函数超时](https://serverlessland.com/content/service/lambda/guides/aws-lambda-operator-guide/configurations#timeouts)，以允许有足够的时间来处理整个批次的项目。如果项目处理需要很长时间，请选择一个较小的批处理大小。大批量处理可以提高非常快速或拥有大量开销的工作负载的效率。如果您在函数上配置了[预留并发](configuration-concurrency.md)，请将最小并发执行数设置为 5，以降低在 Lambda 调用函数时出现节流错误的几率。  
Lambda 通过单个调用将批次中的所有记录传递给函数，前提是事件的总大小未超出同步调用的[调用有效负载大小配额](gettingstarted-limits.md)（6 MB）。Lambda 和 Amazon SQS 都会为每条记录生成元数据。这一额外的元数据将会计入总有效负载大小，并且可能导致批处理中发送的记录总数低于配置的批处理大小。Amazon SQS 发送的元数据字段的长度是可变的。有关 Amazon SQS 元数据字段的更多信息，请参阅*《Amazon Simple Queue Service API 参考》*中的 [ReceiveMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html) API 操作文档。

**Batch 时间**  
在调用函数之前收集记录的最长时间（以秒为单位）。它仅适用于标准队列。  
如果您使用的批次时段大于 0 秒，则必须考虑队列[可见性超时](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)中增加的处理时间。我们建议将队列可见性超时设置为[函数超时](configuration-timeout.md)的六倍，加上 `MaximumBatchingWindowInSeconds` 的值。这使 Lambda 函数有时间处理每个批次的事件，并在出现节流错误时重试。  
当消息可用时，Lambda 开始批量处理消息。Lambda 通过五次并发调用您的函数开始一次处理五个批处理。如果仍有消息可用，则 Lambda 最多每分钟添加 300 次并发函数调用，最多为 1250 次并发调用。使用预置模式时，每个事件轮询器能处理最高 1 MB/s 的吞吐量、最多 10 次并发调用，或者每秒最多进行 10 次 Amazon SQS 轮询 API 调用。Lambda 会根据您所配置的最小值和最大值来扩展事件轮询器的数量，能够迅速将并发调用数量提升至最多每分钟 1000 次，从而实现对 Amazon SQS 事件的低延迟处理。您可以通过这些最小和最大事件轮询器数量设置来控制扩展和并发性。要了解函数扩展和并发的更多信息，请参阅 [了解 Lambda 函数扩展](lambda-concurrency.md)。  
要处理更多消息，您可以优化 Lambda 函数以提高吞吐量。有关更多信息，请参阅[了解 AWS Lambda 如何使用 Amazon SQS 标准队列进行扩展](https://aws.amazon.com/blogs/compute/understanding-how-aws-lambda-scales-when-subscribed-to-amazon-sqs-queues/#:~:text=If there are more messages,messages from the SQS queue.)。

**筛选条件**  
添加筛选条件以控制 Lambda 将哪些事件发送给函数进行处理。有关更多信息，请参阅 [控制 Lambda 向您的函数发送的事件](invocation-eventfiltering.md)。

**最大并发数**  
事件源可调用的最大并发函数数量。不能与已启用的预置模式一起使用。有关更多信息，请参阅 [为 Amazon SQS 事件源配置最大并发](services-sqs-scaling.md#events-sqs-max-concurrency)。

**预置模式**  
启用后，将为您的事件源映射分配专用的轮询资源。您可以配置事件轮询器的最小数量（2-200）和最大数量（2-2000）。每个事件轮询器能处理最高 1 MB/秒 的吞吐量、最多 10 次并发调用，或者每秒最多进行 10 次 10 Amazon SQS 轮询调用。  
注意：您不能同时使用预置模式和最大并发性这两种配置。启用预置模式后，使用最大轮询器数量设置来控制并发性。

# 为 SQS 事件源映射配置扩展行为
<a name="services-sqs-scaling"></a>

您可以通过最大并发数设置或者启用预置模式来控制您的 Amazon SQS 事件源映射的扩展行为。它们是互斥选项。

默认情况下，Lambda 会根据消息量自动扩展事件轮询器。启用预置模式时，您可以分配最小和最大数量的专用轮询资源，这些资源随时可以处理预期的流量模式。这使您可以通过两种方式优化事件源映射的性能：
+ 标准模式（默认）：Lambda 会自动管理扩展，从少量轮询器开始，然后再根据工作负载纵向扩展或缩减。
+ 预置模式：您可以通过设置最小和最大限制配置专用轮询资源，从而使扩展速度加快 3 倍，处理容量最多提高 16 倍。

对于标准队列，Lambda 使用[长轮询](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html#sqs-long-polling)来轮询一个队列，直到它变为活动状态。当消息可用时，Lambda 会通过函数的五次并发调用开始一次处理五个批次。如果仍有消息可用，则 Lambda 增加批量读取的进程数，最多每分钟增加 300 次并发调用。事件源映射可以同时处理的最大调用数量为 1250。当流量较低时，Lambda 会将处理规模缩减为 5 个并发调用，并且可优化为少至 2 个并发调用，以减少 Amazon SQS 调用和相应的成本。但是，当您启用最大并发设置时，此优化不可用。

对于 FIFO 队列，Lambda 按照接收消息的顺序向函数发送消息。向 FIFO 队列发送消息时，需要指定[消息组 ID](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html)。Amazon SQS 确保同一组中的消息按顺序传递到 Lambda。当 Lambda 按批次读取消息时，每个批次可能包含来自多个消息组的消息，但消息的顺序保持不变。如果函数返回错误，函数会对受影响的消息尝试所有重试，然后 Lambda 才会接收来自同一个组的其他消息。

使用预置模式时，每个事件轮询器能处理最高 1 MB/秒的吞吐量、最多 10 次并发调用，或者每秒最多进行 10 次 Amazon SQS 轮询 API 调用。Lambda 会根据您所配置的最小值和最大值来扩展事件轮询器的数量，能够迅速将并发次数提升至最多每分钟 1000 次，从而实现对 Amazon SQS 事件的一致性低延迟处理。使用预调配模式会产生额外成本。有关详细定价，请参阅 [AWS Lambda 定价](https://aws.amazon.com/lambda/pricing/)。每个事件轮询器对您的 SQS 队列使用[长轮询](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html)，每秒最多进行 10 次轮询，这会产生 SQS API 的请求费用。有关详细信息，请参阅 [Amazon SQS 定价](https://aws.amazon.com/sqs/pricing/ )。您可以通过这些最小和最大事件轮询器设置来控制扩展和并发情况，而非使用最大并发设置，因为这两种选项不能同时使用。

**注意**  
您不能同时使用最大并发设置和预置模式。启用预置模式后，您可以通过事件轮询器的最小和最大数量来控制 Amazon SQS 事件源映射的扩展和并发性。

## 为 Amazon SQS 事件源配置最大并发
<a name="events-sqs-max-concurrency"></a>

您可以使用最大并发设置来控制 SQS 事件源的扩展行为。请注意，启用预置模式后无法使用最大并发数。最大并发设置限制了 Amazon SQS 事件源可以调用的函数的并发实例数。最大并发属于事件源级别的设置。如果您将多个 Amazon SQS 事件源映射到一个函数，则每个事件源均可进行单独的最大并发设置。您可以使用最大并发，从而防止某个队列使用函数的所有[预留并发](configuration-concurrency.md)或[账户的其余并发限额](gettingstarted-limits.md)。在 Amazon SQS 事件源上配置最大并发不收取任何费用。

重要的是，最大并发和预留并发为两个独立的设置。不要将最大并发设置为高于函数的预留并发。配置最大并发后，请确保您的函数的预留并发大于或等于该函数上所有 Amazon SQS 事件源的最大总并发数。否则，Lambda 可能会限制您的消息。

当您账户的并发配额设置为默认值 1000 时，Amazon SQS 事件源映射可以扩展为调用最高此值的函数实例，除非您指定最大并发量。

如果您账户的默认并发配额有所增加，Lambda 可能无法调用最高达到新配额的并发函数实例。默认情况下，Lambda 可以扩展为针对 Amazon SQS 事件源映射调用最高 1250 个并发函数实例。如果这不足以满足您的应用场景，请联系 AWS 支持人员，讨论如何提高您账户的 Amazon SQS 事件源映射并发度。

**注意**  
对于 FIFO 队列，并发调用的上限为[消息组 ID](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html) 的数量 (`messageGroupId`) 或最大并发设置，以较低者为准。例如，如果您有六个消息组 ID 并且最大并发设置为 10，则函数最多可以进行六次并发调用。

您可以在新的和现有的 Amazon SQS 事件源映射上配置最大并发。

**使用 Lambda 控制台配置最大并发**

1. 打开 Lamba 控制台的[函数页面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 选择一个函数的名称。

1. 在 **Function overview**（函数概览）下，选择 **SQS**。此操作将打开 **Configuration**（配置）选项卡。

1. 选择 Amazon SQS 触发器，然后选择 **Edit**（编辑）。

1. 对于 **Maximum concurrency**（最大并发），输入 2 到 1,000 之间的数字。要关闭最大并发，将该框保留为空。

1. 选择**保存**。

**使用 AWS Command Line Interface（AWS CLI）配置最大并发**  
使用带 `--scaling-config` 选项的 [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) 命令。示例：

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --scaling-config '{"MaximumConcurrency":5}'
```

要关闭最大并发，请为 `--scaling-config` 输入一个空值：

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --scaling-config "{}"
```

**使用 Lambda API 配置最大并发**  
将 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 或 [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) 操作与 [ScalingConfig](https://docs.aws.amazon.com/lambda/latest/api/API_ScalingConfig.html) 对象结合使用。

# 在 Lambda 中处理 SQS 事件源错误
<a name="services-sqs-errorhandling"></a>

为了处理与 SQS 事件源相关的错误，Lambda 会自动使用带有回退策略的重试策略。您还可以通过配置 SQS 事件源映射来返回[部分批次响应](#services-sqs-batchfailurereporting)，从而自定义错误处理行为。

## 失败调用的回退策略
<a name="services-sqs-backoff-strategy"></a>

如果调用失败，Lambda 会在实施回退策略时尝试重试调用。回退策略略有不同，具体取决于 Lambda 的故障原因是函数代码中的错误还是节流所致。
+  如果您的**函数代码**导致了该错误，Lambda 将停止处理并重试调用。同时，Lambda 会逐渐退出，以减少分配给 Amazon SQS 事件源映射的并发量。队列的可见性超时结束后，消息将再次显示在队列中。
+ 如果调用失败是**节流**造成的，Lambda 会通过减少分配给 Amazon SQS 事件源映射的并发量来逐渐停止重试。Lambda 会继续重试该消息，直到消息的时间戳超过队列的可见性超时，此时 Lambda 会删除该消息。

## 实施部分批处理响应
<a name="services-sqs-batchfailurereporting"></a>

预设情况下,如果您的 Lambda 函数在处理某个批处理时遇到错误，则该批处理中的所有消息都会在队列中重新可见，包括 Lambda 已经成功处理的消息。因此，您的函数最终可能会多次处理同一消息。

对于处理失败的批处理，要避免重新处理其中已经成功处理的消息，您可以将事件源映射配置为仅使失败的消息重新可见。这称为部分批处理响应。要开启部分批处理响应，请在配置事件源映射时为 [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html#lambda-UpdateEventSourceMapping-request-FunctionResponseTypes) 操作指定 `ReportBatchItemFailures`。这可以让您的函数返回部分成功，从而有助于减少对记录进行不必要的重试次数。

**注意**  
Powertools for AWS Lambda 中的[批处理实用程序](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)会自动处理所有部分批处理响应逻辑。此实用程序简化了批处理模式的实现，并减少了正确处理批处理项目失败所需的自定义代码。它适用于 Python、Java、Typescript 和 .NET。

激活 `ReportBatchItemFailures` 后，当函数调用失败时，Lambda 不会 [缩减消息轮询范围](#services-sqs-backoff-strategy)。如果您预计某些消息会失败，并且不希望这些失败影响消息处理速率，请使用 `ReportBatchItemFailures`。

**注意**  
在使用部分批处理响应时，请记住以下几点：  
如果您函数发现了一个异常，则整个批处理将被视为完全失败。
如果您将此功能与 FIFO 队列结合使用，则您的函数应在第一次失败后停止处理消息，并返回 `batchItemFailures` 中的所有失败和未处理的消息。这有助于保持队列中消息的顺序。

**激活部分批处理报告**

1. 查看 [实施部分批处理响应的最佳实践](https://docs.aws.amazon.com/prescriptive-guidance/latest/lambda-event-filtering-partial-batch-responses-for-sqs/best-practices-partial-batch-responses.html)。

1. 运行以下命令来为您的函数激活 `ReportBatchItemFailures`。要检索事件源映射的 UUID，请运行 [list-event-source-mappings](https://docs.aws.amazon.com/cli/latest/reference/lambda/list-event-source-mappings.html) AWS CLI 命令。

   ```
   aws lambda update-event-source-mapping \
   --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
   --function-response-types "ReportBatchItemFailures"
   ```

1. 更新您的函数代码以捕获所有异常并在 `batchItemFailures` JSON 响应中返回处理失败的消息。`batchItemFailures` 响应必须包含消息 ID 列表，以作为 `itemIdentifier` JSON 值。

   例如，假设一个批处理有五条消息，消息 ID 分别为 `id1`、`id2`、`id3`、`id4` 和 `id5`。您的函数成功处理了 `id1`、`id3` 和 `id5`。要使消息 `id2` 和 `id4` 在队列中重新可见，您的函数应运行以下响应：

   ```
   { 
     "batchItemFailures": [ 
           {
               "itemIdentifier": "id2"
           },
           {
               "itemIdentifier": "id4"
           }
       ]
   }
   ```

   以下函数代码示例将返回批处理中处理失败消息的 ID 列表：

------
#### [ .NET ]

**适用于 .NET 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 .NET 进行 Lambda SQS 批处理项目失败。  

   ```
   // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   // SPDX-License-Identifier: Apache-2.0
   using Amazon.Lambda.Core;
   using Amazon.Lambda.SQSEvents;
   
   // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
   [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
   namespace sqsSample;
   
   public class Function
   {
       public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context)
       {
           List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>();
           foreach(var message in evnt.Records)
           {
               try
               {
                   //process your message
                   await ProcessMessageAsync(message, context);
               }
               catch (System.Exception)
               {
                   //Add failed message identifier to the batchItemFailures list
                   batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); 
               }
           }
           return new SQSBatchResponse(batchItemFailures);
       }
   
       private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context)
       {
           if (String.IsNullOrEmpty(message.Body))
           {
               throw new Exception("No Body in SQS Message.");
           }
           context.Logger.LogInformation($"Processed message {message.Body}");
           // TODO: Do interesting work based on the new message
           await Task.CompletedTask;
       }
   }
   ```

------
#### [ Go ]

**适用于 Go 的 SDK V2**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Go 进行 Lambda SQS 批处理项目失败。  

   ```
   // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   // SPDX-License-Identifier: Apache-2.0
   package main
   
   import (
   	"context"
   	"fmt"
   	"github.com/aws/aws-lambda-go/events"
   	"github.com/aws/aws-lambda-go/lambda"
   )
   
   func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) {
   	batchItemFailures := []map[string]interface{}{}
   
   	for _, message := range sqsEvent.Records {
   		if len(message.Body) > 0 {
   			// Your message processing condition here
   			fmt.Printf("Successfully processed message: %s\n", message.Body)
   		} else {
   			// Message processing failed
   			fmt.Printf("Failed to process message %s\n", message.MessageId)
   			batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId})
   		}
   	}
   
   	sqsBatchResponse := map[string]interface{}{
   		"batchItemFailures": batchItemFailures,
   	}
   	return sqsBatchResponse, nil
   }
   
   func main() {
   	lambda.Start(handler)
   }
   ```

------
#### [ Java ]

**适用于 Java 的 SDK 2.x**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Java 进行 Lambda SQS 批处理项目失败。  

   ```
   // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   // SPDX-License-Identifier: Apache-2.0
   import com.amazonaws.services.lambda.runtime.Context;
   import com.amazonaws.services.lambda.runtime.RequestHandler;
   import com.amazonaws.services.lambda.runtime.events.SQSEvent;
   import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
    
   import java.util.ArrayList;
   import java.util.List;
    
   public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> {
       @Override
       public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
            List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>();
   
            for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) {
                try {
                    //process your message
                } catch (Exception e) {
                    //Add failed message identifier to the batchItemFailures list
                    batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(message.getMessageId()));
                }
            }
            return new SQSBatchResponse(batchItemFailures);
        }
   }
   ```

------
#### [ JavaScript ]

**SDK for JavaScript（v3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 JavaScript 进行 Lambda SQS 批处理项目失败。  

   ```
   // Node.js 20.x Lambda runtime, AWS SDK for Javascript V3
   export const handler = async (event, context) => {
       const batchItemFailures = [];
       for (const record of event.Records) {
           try {
               await processMessageAsync(record, context);
           } catch (error) {
               batchItemFailures.push({ itemIdentifier: record.messageId });
           }
       }
       return { batchItemFailures };
   };
   
   async function processMessageAsync(record, context) {
       if (record.body && record.body.includes("error")) {
           throw new Error("There is an error in the SQS Message.");
       }
       console.log(`Processed message: ${record.body}`);
   }
   ```
报告使用 TypeScript 进行 Lambda SQS 批处理项目失败。  

   ```
   // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   // SPDX-License-Identifier: Apache-2.0
   import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda';
   
   export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => {
       const batchItemFailures: SQSBatchItemFailure[] = [];
   
       for (const record of event.Records) {
           try {
               await processMessageAsync(record);
           } catch (error) {
               batchItemFailures.push({ itemIdentifier: record.messageId });
           }
       }
   
       return {batchItemFailures: batchItemFailures};
   };
   
   async function processMessageAsync(record: SQSRecord): Promise<void> {
       if (record.body && record.body.includes("error")) {
           throw new Error('There is an error in the SQS Message.');
       }
       console.log(`Processed message ${record.body}`);
   }
   ```

------
#### [ PHP ]

**适用于 PHP 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 PHP 进行 Lambda SQS 批处理项目失败。  

   ```
   // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   // SPDX-License-Identifier: Apache-2.0
   <?php
   
   use Bref\Context\Context;
   use Bref\Event\Sqs\SqsEvent;
   use Bref\Event\Sqs\SqsHandler;
   use Bref\Logger\StderrLogger;
   
   require __DIR__ . '/vendor/autoload.php';
   
   class Handler extends SqsHandler
   {
       private StderrLogger $logger;
       public function __construct(StderrLogger $logger)
       {
           $this->logger = $logger;
       }
   
       /**
        * @throws JsonException
        * @throws \Bref\Event\InvalidLambdaEvent
        */
       public function handleSqs(SqsEvent $event, Context $context): void
       {
           $this->logger->info("Processing SQS records");
           $records = $event->getRecords();
   
           foreach ($records as $record) {
               try {
                   // Assuming the SQS message is in JSON format
                   $message = json_decode($record->getBody(), true);
                   $this->logger->info(json_encode($message));
                   // TODO: Implement your custom processing logic here
               } catch (Exception $e) {
                   $this->logger->error($e->getMessage());
                   // failed processing the record
                   $this->markAsFailed($record);
               }
           }
           $totalRecords = count($records);
           $this->logger->info("Successfully processed $totalRecords SQS records");
       }
   }
   
   $logger = new StderrLogger();
   return new Handler($logger);
   ```

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Python 进行 Lambda SQS 批处理项目失败。  

   ```
   # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   # SPDX-License-Identifier: Apache-2.0
   
   def lambda_handler(event, context):
       if event:
           batch_item_failures = []
           sqs_batch_response = {}
        
           for record in event["Records"]:
               try:
                   print(f"Processed message: {record['body']}")
               except Exception as e:
                   batch_item_failures.append({"itemIdentifier": record['messageId']})
           
           sqs_batch_response["batchItemFailures"] = batch_item_failures
           return sqs_batch_response
   ```

------
#### [ Ruby ]

**适用于 Ruby 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-sqs-to-lambda-with-batch-item-handling)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Ruby 进行 Lambda SQS 批处理项目失败。  

   ```
   # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   # SPDX-License-Identifier: Apache-2.0
   require 'json'
   
   def lambda_handler(event:, context:)
     if event
       batch_item_failures = []
       sqs_batch_response = {}
   
       event["Records"].each do |record|
         begin
           # process message
         rescue StandardError => e
           batch_item_failures << {"itemIdentifier" => record['messageId']}
         end
       end
   
       sqs_batch_response["batchItemFailures"] = batch_item_failures
       return sqs_batch_response
     end
   end
   ```

------
#### [ Rust ]

**适用于 Rust 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/lambda-function-sqs-report-batch-item-failures)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Rust 进行 Lambda SQS 批处理项目失败。  

   ```
   // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   // SPDX-License-Identifier: Apache-2.0
   use aws_lambda_events::{
       event::sqs::{SqsBatchResponse, SqsEvent},
       sqs::{BatchItemFailure, SqsMessage},
   };
   use lambda_runtime::{run, service_fn, Error, LambdaEvent};
   
   async fn process_record(_: &SqsMessage) -> Result<(), Error> {
       Err(Error::from("Error processing message"))
   }
   
   async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> {
       let mut batch_item_failures = Vec::new();
       for record in event.payload.records {
           match process_record(&record).await {
               Ok(_) => (),
               Err(_) => batch_item_failures.push(BatchItemFailure {
                   item_identifier: record.message_id.unwrap(),
               }),
           }
       }
   
       Ok(SqsBatchResponse {
           batch_item_failures,
       })
   }
   
   #[tokio::main]
   async fn main() -> Result<(), Error> {
       run(service_fn(function_handler)).await
   }
   ```

------

如果处理失败的事件没有返回到队列，请参阅 AWS 知识中心中的 [如何排查 Lambda 函数 SQS ReportBatchItemFailures 问题？](https://aws.amazon.com/premiumsupport/knowledge-center/lambda-sqs-report-batch-item-failures/)。

### 成功和失败的条件
<a name="sqs-batchfailurereporting-conditions"></a>

如果您的函数返回以下任意一项，则 Lambda 会将批处理视为完全成功：
+ 空的 `batchItemFailures` 列表
+ Null `batchItemFailures` 列表
+ 空的 `EventResponse`
+ Null `EventResponse`

如果您的函数返回以下任意一项，则 Lambda 会将批处理视为完全失败：
+ JSON 响应无效
+ 空字符串 `itemIdentifier`
+ Null `itemIdentifier`
+ 包含错误密钥名的 `itemIdentifier`
+ 具有某个消息 ID 的 `itemIdentifier` 值不存在

### CloudWatch 指标
<a name="sqs-batchfailurereporting-metrics"></a>

要确定函数是否在正确报告批处理项目失败情况，您可以监控 Amazon CloudWatch 中的 `NumberOfMessagesDeleted` 和 `ApproximateAgeOfOldestMessage` Amazon SQS 指标。
+ `NumberOfMessagesDeleted` 会跟踪从队列中删除的消息数量。如果该值降至 0，则表明您的函数响应没有正确返回失败的消息。
+ `ApproximateAgeOfOldestMessage` 会跟踪最早的消息在队列中停留的时间。如果此指标急剧增加，则可能表明您的函数没有正确返回失败的消息。

### 使用 Powertools for AWS Lambda 批处理器
<a name="services-sqs-batchfailurereporting-powertools"></a>

Powertools for AWS Lambda 中的批处理器实用程序会自动处理部分批处理响应逻辑，从而降低实施批处理故障报告的复杂性。下面是使用批处理器的示例：

**Python**  
有关完整的示例和设置说明，请参阅[批处理器文档](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)。
使用 AWS Lambda 批处理器处理 Amazon SQS 消息。  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import SQSEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.SQS)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
有关完整的示例和设置说明，请参阅[批处理器文档](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/)。
使用 AWS Lambda 批处理器处理 Amazon SQS 消息。  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { SQSEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.SQS);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: SQSEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

# Amazon SQS 事件源映射的 Lambda 参数
<a name="services-sqs-parameters"></a>

所有 Lambda 事件源类型共享相同的 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 和 [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API 操作。但是，只有部分参数适用于 Amazon SQS。


| 参数 | 必需 | 默认值 | 备注 | 
| --- | --- | --- | --- | 
|  BatchSize  |  N  |  10  |  对于标准队列，最大值为 10000。对于 FIFO 队列，最大值为 10。  | 
|  启用  |  N  |  真实  | none  | 
|  EventSourceArn  |  Y  | 不适用 |  数据流或流使用者的 ARN  | 
|  FunctionName  |  是  | 不适用  | none  | 
|  FilterCriteria  |  N  |  不适用   |  [控制 Lambda 向您的函数发送的事件](invocation-eventfiltering.md)  | 
|  FunctionResponseTypes  |  N  | 不适用  |  要使您的函数报告某个批处理中的特定失败，请在 `FunctionResponseTypes` 中包含值 `ReportBatchItemFailures`。有关更多信息，请参阅 [实施部分批处理响应](services-sqs-errorhandling.md#services-sqs-batchfailurereporting)。  | 
|  MaximumBatchingWindowInSeconds  |  N  |  0  | FIFO 队列不支持批处理窗口 | 
|  ProvisionedPollerConfig  |  N  |  不适用  |  为 SQS 事件源映射配置专用事件轮询器的最小数量（2-200）和最大数量（2-2000）。每个轮询器可以处理最高 1 MB/秒的吞吐量和 10 次并发调用。  | 
|  ScalingConfig  |  N  |  不适用   |  [为 Amazon SQS 事件源配置最大并发](services-sqs-scaling.md#events-sqs-max-concurrency)  | 

# 对 Amazon SQS 事件源使用事件筛选
<a name="with-sqs-filtering"></a>

您可以使用事件筛选，控制 Lambda 将流或队列中的哪些记录发送给函数。有关事件筛选工作原理的一般信息，请参阅 [控制 Lambda 向您的函数发送的事件](invocation-eventfiltering.md)。

本节重点介绍 Amazon SQS 事件源的事件筛选。

**注意**  
Amazon SQS 事件源映射仅支持对 `body` 键进行筛选。

**Topics**
+ [Amazon SQS 事件筛选基础知识](#filtering-SQS)

## Amazon SQS 事件筛选基础知识
<a name="filtering-SQS"></a>

假设 Amazon SQS 队列包含以下 JSON 格式的消息。

```
{
    "RecordNumber": 1234,
    "TimeStamp": "yyyy-mm-ddThh:mm:ss",
    "RequestCode": "AAAA"
}
```

此队列的示例记录将如下所示。

```
{
    "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
    "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
    "body": "{\n "RecordNumber": 1234,\n "TimeStamp": "yyyy-mm-ddThh:mm:ss",\n "RequestCode": "AAAA"\n}",
    "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1545082649183",
        "SenderId": "AIDAIENQZJOLO23YVJ4VO",
        "ApproximateFirstReceiveTimestamp": "1545082649185"
        },
    "messageAttributes": {},
    "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
    "eventSource": "aws:sqs",
    "eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:my-queue",
    "awsRegion": "us-west-2"
}
```

要根据 Amazon SQS 消息的内容进行筛选，请使用 Amazon SQS 消息记录中的 `body` 键。假设您只想处理 Amazon SQS 消息中 `RequestCode` 为“BBBB”的记录。`FilterCriteria` 对象将如下所示。

```
{
    "Filters": [
        {
            "Pattern": "{ \"body\" : { \"RequestCode\" : [ \"BBBB\" ] } }"
        }
    ]
}
```

为了更清楚起见，以下是在纯 JSON 中展开的筛选条件 `Pattern` 的值。

```
{
    "body": {
        "RequestCode": [ "BBBB" ]
        }
}
```

您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。

------
#### [ Console ]

要使用控制台添加此筛选条件，请按照 [将筛选条件附加到事件源映射（控制台）](invocation-eventfiltering.md#filtering-console) 中的说明，为**筛选条件**输入以下字符串。

```
{ "body" : { "RequestCode" : [ "BBBB" ] } }
```

------
#### [ AWS CLI ]

要使用 AWS Command Line Interface（AWS CLI）创建包含这些筛选条件的新事件源映射，请运行以下命令。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:sqs:us-east-2:123456789012:my-queue \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"body\" : { \"RequestCode\" : [ \"BBBB\" ] } }"}]}'
```

要将这些筛选条件添加到现有事件源映射中，请运行以下命令。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"body\" : { \"RequestCode\" : [ \"BBBB\" ] } }"}]}'
```

------
#### [ AWS SAM ]

要使用 AWS SAM 添加此筛选条件，请将以下代码段添加到事件源的 YAML 模板中。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "body" : { "RequestCode" : [ "BBBB" ] } }'
```

------

假设您希望函数只处理 `RecordNumber` 大于 9999 的记录。`FilterCriteria` 对象将如下所示。

```
{
    "Filters": [
        {
            "Pattern": "{ \"body\" : { \"RecordNumber\" : [ { \"numeric\": [ \">\", 9999 ] } ] } }"
        }
    ]
}
```

为了更清楚起见，以下是在纯 JSON 中展开的筛选条件 `Pattern` 的值。

```
{
    "body": {
        "RecordNumber": [
            {
                "numeric": [ ">", 9999 ]
            }
        ]
    }
}
```

您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。

------
#### [ Console ]

要使用控制台添加此筛选条件，请按照 [将筛选条件附加到事件源映射（控制台）](invocation-eventfiltering.md#filtering-console) 中的说明，为**筛选条件**输入以下字符串。

```
{ "body" : { "RecordNumber" : [ { "numeric": [ ">", 9999 ] } ] } }
```

------
#### [ AWS CLI ]

要使用 AWS Command Line Interface（AWS CLI）创建包含这些筛选条件的新事件源映射，请运行以下命令。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:sqs:us-east-2:123456789012:my-queue \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"body\" : { \"RecordNumber\" : [ { \"numeric\": [ \">\", 9999 ] } ] } }"}]}'
```

要将这些筛选条件添加到现有事件源映射中，请运行以下命令。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"body\" : { \"RecordNumber\" : [ { \"numeric\": [ \">\", 9999 ] } ] } }"}]}'
```

------
#### [ AWS SAM ]

要使用 AWS SAM 添加此筛选条件，请将以下代码段添加到事件源的 YAML 模板中。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "body" : { "RecordNumber" : [ { "numeric": [ ">", 9999 ] } ] } }'
```

------

对于 Amazon SQS，消息正文可以是任何字符串。但如果您的 `FilterCriteria` 期望 `body` 为有效的 JSON 格式，则可能会导致问题。反之亦然：如果传入的消息正文为 JSON 格式，但筛选条件期望 `body` 为纯字符串，这可能会导致出现意外行为。

要避免此问题，请确保 `FilterCriteria` 中的正文格式与您从队列中收到的消息中的 `body` 的期望格式一致。在筛选消息之前，Lambda 会自动评估传入消息正文的格式以及 `body` 的筛选条件模式的格式。如果不一致，Lambda 将会删除此消息。下表汇总了此评估：


| 传入消息 `body` 格式 | 筛选条件模式 `body` 格式 | 导致的操作 | 
| --- | --- | --- | 
|  纯字符串  |  纯字符串  |  Lambda 根据您的筛选条件进行筛选。  | 
|  纯字符串  |  数据属性中没有筛选条件模式  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 
|  纯字符串  |  有效 JSON  |  Lambda 将会丢弃消息。  | 
|  有效 JSON  |  纯字符串  |  Lambda 将会丢弃消息。  | 
|  有效 JSON  |  数据属性中没有筛选条件模式  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 
|  有效 JSON  |  有效 JSON  |  Lambda 根据您的筛选条件进行筛选。  | 

# 教程：将 Lambda 与 Amazon SQS 结合使用
<a name="with-sqs-example"></a>

在此教程中，您将创建一个会使用来自某个 [Amazon Simple Queue Service（Amazon SQS）](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html)队列的消息的 Lambda 函数。只要有新消息添加到队列，Lambda 函数就会运行。函数会将消息写入 Amazon CloudWatch Logs 日志流。下图显示了您用于完成教程的 AWS 资源。

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/sqs_tut_resources.png)


要完成本教程，请执行以下步骤：

1. 创建将消息写入 CloudWatch Logs 的 Lambda 函数。

1. 创建 Amazon SQS 队列。

1. 创建 Lambda 事件源映射。事件源映射会读取 Amazon SQS 队列并在添加新消息时调用 Lambda 函数。

1. 将消息添加到队列并在 CloudWatch Logs 中监控结果来测试设置。

## 先决条件
<a name="with-sqs-prepare"></a>

### 安装 AWS Command Line Interface
<a name="install_aws_cli"></a>

如果您尚未安装 AWS Command Line Interface，请按照[安装或更新最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) 中的步骤进行安装。

本教程需要命令行终端或 Shell 来运行命令。在 Linux 和 macOS 中，可使用您首选的 Shell 和程序包管理器。

**注意**  
在 Windows 中，操作系统的内置终端不支持您经常与 Lambda 一起使用的某些 Bash CLI 命令（例如 `zip`）。[安装 Windows Subsystem for Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10)，获取 Ubuntu 和 Bash 与 Windows 集成的版本。

## 创建执行角色
<a name="with-sqs-create-execution-role"></a>

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/sqs_tut_steps1.png)


[执行角色](lambda-intro-execution-role.md)是一个 AWS Identity and Access Management（IAM）角色，用于向 Lambda 函数授予访问 AWS 服务 和资源的权限。要允许函数从 Amazon SQS 中读取项目，请附加 **AWSLambdaSQSQueueExecutionRole** 权限策略。

**创建执行角色并附加 Amazon SQS 权限策略**

1. 打开 IAM 控制台的[角色页面](https://console.aws.amazon.com/iam/home#/roles)。

1. 选择**创建角色**。

1. 在**可信实体类型**中选择 **AWS 服务**。

1. 在**使用案例**中选择 **Lambda**。

1. 选择**下一步**。

1. 在**权限策略**搜索框中输入 **AWSLambdaSQSQueueExecutionRole**。

1. 选择 **AWSLambdaSQSQueueExecutionRole** 策略，然后选择**下一步**。

1. 在**角色详细信息**下，为**角色名称**输入 **lambda-sqs-role**，然后选择**创建角色**。

角色创建后，记下执行角色的 Amazon 资源名称（ARN）。您将在后面的步骤中用到它。

## 创建函数
<a name="with-sqs-create-function"></a>

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/sqs_tut_steps2.png)


创建一个处理您的 Amazon SQS 消息的 Lambda 函数。函数代码将 Amazon SQS 消息的正文记录到 CloudWatch Logs 中。

本教程使用 Node.js 24 运行时系统，但我们还提供了其他运行时系统语言的示例代码。您可以选择以下框中的选项卡，查看适用于您感兴趣的运行时系统的代码。您将在此步骤中使用的 JavaScript 代码是 **JavaScript** 选项卡中显示的第一个示例。

------
#### [ .NET ]

**适用于 .NET 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-sqs-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 .NET 将 SQS 事件与 Lambda 结合使用。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace SqsIntegrationSampleCode
{
    public async Task FunctionHandler(SQSEvent evnt, ILambdaContext context)
    {
        foreach (var message in evnt.Records)
        {
            await ProcessMessageAsync(message, context);
        }

        context.Logger.LogInformation("done");
    }

    private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context)
    {
        try
        {
            context.Logger.LogInformation($"Processed message {message.Body}");

            // TODO: Do interesting work based on the new message
            await Task.CompletedTask;
        }
        catch (Exception e)
        {
            //You can use Dead Letter Queue to handle failures. By configuring a Lambda DLQ.
            context.Logger.LogError($"An error occurred");
            throw;
        }

    }
}
```

------
#### [ Go ]

**适用于 Go 的 SDK V2**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-sqs-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Go 将 SQS 事件与 Lambda 结合使用。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package integration_sqs_to_lambda

import (
	"fmt"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(event events.SQSEvent) error {
	for _, record := range event.Records {
		err := processMessage(record)
		if err != nil {
			return err
		}
	}
	fmt.Println("done")
	return nil
}

func processMessage(record events.SQSMessage) error {
	fmt.Printf("Processed message %s\n", record.Body)
	// TODO: Do interesting work based on the new message
	return nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**适用于 Java 的 SDK 2.x**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-sqs-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Java 将 SQS 事件与 Lambda 结合使用。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;

public class Function implements RequestHandler<SQSEvent, Void> {
    @Override
    public Void handleRequest(SQSEvent sqsEvent, Context context) {
        for (SQSMessage msg : sqsEvent.getRecords()) {
            processMessage(msg, context);
        }
        context.getLogger().log("done");
        return null;
    }

    private void processMessage(SQSMessage msg, Context context) {
        try {
            context.getLogger().log("Processed message " + msg.getBody());

            // TODO: Do interesting work based on the new message

        } catch (Exception e) {
            context.getLogger().log("An error occurred");
            throw e;
        }

    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript（v3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/blob/main/integration-sqs-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 JavaScript 将 SQS 事件与 Lambda 结合使用。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const message of event.Records) {
    await processMessageAsync(message);
  }
  console.info("done");
};

async function processMessageAsync(message) {
  try {
    console.log(`Processed message ${message.body}`);
    // TODO: Do interesting work based on the new message
    await Promise.resolve(1); //Placeholder for actual async work
  } catch (err) {
    console.error("An error occurred");
    throw err;
  }
}
```
通过 TypeScript 将 SQS 事件与 Lambda 结合使用。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { SQSEvent, Context, SQSHandler, SQSRecord } from "aws-lambda";

export const functionHandler: SQSHandler = async (
  event: SQSEvent,
  context: Context
): Promise<void> => {
  for (const message of event.Records) {
    await processMessageAsync(message);
  }
  console.info("done");
};

async function processMessageAsync(message: SQSRecord): Promise<any> {
  try {
    console.log(`Processed message ${message.body}`);
    // TODO: Do interesting work based on the new message
    await Promise.resolve(1); //Placeholder for actual async work
  } catch (err) {
    console.error("An error occurred");
    throw err;
  }
}
```

------
#### [ PHP ]

**适用于 PHP 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-sqs-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 PHP 将 SQS 事件与 Lambda 结合使用。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\InvalidLambdaEvent;
use Bref\Event\Sqs\SqsEvent;
use Bref\Event\Sqs\SqsHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends SqsHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws InvalidLambdaEvent
     */
    public function handleSqs(SqsEvent $event, Context $context): void
    {
        foreach ($event->getRecords() as $record) {
            $body = $record->getBody();
            // TODO: Do interesting work based on the new message
        }
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-sqs-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Python 将 SQS 事件与 Lambda 结合使用。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def lambda_handler(event, context):
    for message in event['Records']:
        process_message(message)
    print("done")

def process_message(message):
    try:
        print(f"Processed message {message['body']}")
        # TODO: Do interesting work based on the new message
    except Exception as err:
        print("An error occurred")
        raise err
```

------
#### [ Ruby ]

**适用于 Ruby 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-sqs-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Ruby 将 SQS 事件与 Lambda 结合使用。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def lambda_handler(event:, context:)
  event['Records'].each do |message|
    process_message(message)
  end
  puts "done"
end

def process_message(message)
  begin
    puts "Processed message #{message['body']}"
    # TODO: Do interesting work based on the new message
  rescue StandardError => err
    puts "An error occurred"
    raise err
  end
end
```

------
#### [ Rust ]

**适用于 Rust 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-sqs-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Rust 将 SQS 事件与 Lambda 结合使用。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::sqs::SqsEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
    event.payload.records.iter().for_each(|record| {
        // process the record
        tracing::info!("Message body: {}", record.body.as_deref().unwrap_or_default())
    });

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

**创建 Node.js Lambda 函数**

1. 为项目创建一个目录，然后切换到该目录。

   ```
   mkdir sqs-tutorial
   cd sqs-tutorial
   ```

1. 将示例 JavaScript 代码复制到名为 `index.js` 的新文件中。

1. 使用以下 `zip` 命令创建部署包。

   ```
   zip function.zip index.js
   ```

1. 使用 [create-function](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-function.html) AWS CLI 命令创建 Lambda 函数。对于 `role` 参数，请输入您之前创建的执行角色的 ARN。
**注意**  
Lambda 函数和 Amazon SQS 队列必须位于同一 AWS 区域。

   ```
   aws lambda create-function --function-name ProcessSQSRecord \
   --zip-file fileb://function.zip --handler index.handler --runtime nodejs24.x \
   --role arn:aws:iam::111122223333:role/lambda-sqs-role
   ```

## 测试此函数
<a name="with-sqs-create-test-function"></a>

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/sqs_tut_steps3.png)


使用 `invoke` AWS CLI 命令和一个示例 Amazon SQS 事件手动调用您的 Lambda 函数。

**使用示例事件调用 Lambda 函数**

1. 将下列 JSON 保存为名为 `input.json` 的文件。此 JSON 模拟 Amazon SQS 可能发送到 Lambda 函数的事件，其中 `"body"` 包含来自该队列的实际消息。在本示例中，消息为 `"test"`。  
**Example Amazon SQS 事件**  

   此为测试事件，无需您更改消息或账号。

   ```
   {
       "Records": [
           {
               "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
               "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
               "body": "test",
               "attributes": {
                   "ApproximateReceiveCount": "1",
                   "SentTimestamp": "1545082649183",
                   "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                   "ApproximateFirstReceiveTimestamp": "1545082649185"
               },
               "messageAttributes": {},
               "md5OfBody": "098f6bcd4621d373cade4e832627b4f6",
               "eventSource": "aws:sqs",
               "eventSourceARN": "arn:aws:sqs:us-east-1:111122223333:my-queue",
               "awsRegion": "us-east-1"
           }
       ]
   }
   ```

1. 运行以下[调用](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/invoke.html) AWS CLI 命令。此命令将在响应中返回 CloudWatch 日志。有关检索日志的更多信息，请参阅[使用 AWS CLI 访问日志](monitoring-cloudwatchlogs-view.md#monitoring-cloudwatchlogs-cli)。

   ```
   aws lambda invoke --function-name ProcessSQSRecord --payload file://input.json out --log-type Tail \
   --query 'LogResult' --output text --cli-binary-format raw-in-base64-out | base64 --decode
   ```

   如果使用 **cli-binary-format** 版本 2，则 AWS CLI 选项是必需的。要将其设为默认设置，请运行 `aws configure set cli-binary-format raw-in-base64-out`。有关更多信息，请参阅*版本 2 的 AWS Command Line Interface 用户指南*中的 [AWS CLI 支持的全局命令行选项](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list)。

1. 在响应中查找 `INFO` 日志。Lambda 函数会在此处记录消息正文。应看到类似如下内容的日志：

   ```
   2023-09-11T22:45:04.271Z	348529ce-2211-4222-9099-59d07d837b60	INFO	Processed message test
   2023-09-11T22:45:04.288Z	348529ce-2211-4222-9099-59d07d837b60	INFO	done
   ```

## 创建 Amazon SQS 队列
<a name="with-sqs-configure-sqs"></a>

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/sqs_tut_steps4.png)


创建一个可由 Lambda 函数用作事件源的 Amazon SQS 队列。Lambda 函数和 Amazon SQS 队列必须位于同一 AWS 区域。

**创建队列**

1. 打开 [Amazon SQS 控制台](https://console.aws.amazon.com/sqs)。

1. 选择**创建队列**。

1. 输入队列名称。将所有其他选项保留为默认设置。

1. 选择**创建队列**。

在创建队列后，记下其 ARN。在下一步中将该队列与您的 Lambda 函数关联时，您将需要此类信息。

## 配置事件源
<a name="with-sqs-attach-notification-configuration"></a>

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/sqs_tut_steps5.png)


创建[事件源映射](invocation-eventsourcemapping.md)，将 Amazon SQS 队列连接到 Lambda 函数。事件源映射会读取 Amazon SQS 队列并在添加新消息时调用 Lambda 函数。

要在 Amazon SQS 队列与 Lambda 函数之间创建映射，请使用以下 [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) AWS CLI 命令。示例：

```
aws lambda create-event-source-mapping --function-name ProcessSQSRecord  --batch-size 10 \
--event-source-arn arn:aws:sqs:us-east-1:111122223333:my-queue
```

要获取事件源映射列表，请使用 [list-event-source-mappings](https://awscli.amazonaws.com/v2/documentation/api/2.1.29/reference/lambda/list-event-source-mappings.html) 命令。示例：

```
aws lambda list-event-source-mappings --function-name ProcessSQSRecord
```

## 发送测试消息
<a name="with-sqs-test-message"></a>

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/sqs_tut_steps6.png)


**将 Amazon SQS 消息发送到 Lambda 函数**

1. 打开 [Amazon SQS 控制台](https://console.aws.amazon.com/sqs)。

1. 选择您之前创建的队列。

1. 选择**发送和接收消息**。

1. 在**消息正文**下输入测试消息，例如“这是一条测试消息”。

1. 选择**发送消息**。

Lambda 轮询队列以获取更新。当有新消息时，Lambda 会使用该新的事件数据从队列中调用您的函数。如果该函数处理程序正常返回并且没有异常，则 Lambda 认为该消息得到成功处理并开始读取队列中的新消息。成功处理消息后，Lambda 从队列中将其自动删除。如果该处理程序引发异常，则 Lambda 认为消息的批量处理并未成功进行，并且 Lambda 会用相同的批量消息调用该函数。

## 查看 CloudWatch 日志
<a name="with-sqs-check-logs"></a>

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/sqs_tut_steps7.png)


**确认函数已处理消息**

1. 打开 Lamba 控制台的[函数](https://console.aws.amazon.com/lambda/home#/functions)页面。

1. 选择 **ProcessSQSRecord** 函数。

1. 选择 **Monitor (监控)**。

1. 选择**查看 CloudWatch 日志**。

1. 在 CloudWatch 控制台中，为该函数选择**日志流**。

1. 查找 `INFO` 日志。Lambda 函数会在此处记录消息正文。您应该能看到从 Amazon SQS 队列发送的消息。示例：

   ```
   2023-09-11T22:49:12.730Z b0c41e9c-0556-5a8b-af83-43e59efeec71 INFO Processed message this is a test message.
   ```

## 清除资源
<a name="cleanup"></a>

除非您想要保留为本教程创建的资源，否则可立即将其删除。通过删除您不再使用的 AWS 资源，可防止您的 AWS 账户 产生不必要的费用。

**删除执行角色**

1. 打开 IAM 控制台的[角色页面](https://console.aws.amazon.com/iam/home#/roles)。

1. 选择您创建的执行角色。

1. 选择**删除**。

1. 在文本输入字段中输入角色名称，然后选择**删除**。

**删除 Lambda 函数**

1. 打开 Lamba 控制台的 [Functions（函数）页面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 选择您创建的函数。

1. 依次选择**操作**和**删除**。

1. 在文本输入字段中键入 **confirm**，然后选择 **Delete**（删除）。

**删除 Amazon SQS 队列**

1. 登录到 AWS 管理控制台 并打开 Amazon SQS 控制台，网址：[https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/sqs/)。

1. 选择创建的队列。

1. 选择**删除**。

1. 在文本输入字段中输入 **confirm**。

1. 选择**删除**。

# 教程：将跨账户 Amazon SQS 队列用作事件源
<a name="with-sqs-cross-account-example"></a>

在此教程中，您将创建一个使用来自不同 AWS 账户中 Amazon Simple Queue Service (Amazon SQS) 队列的消息的 Lambda 函数。本教程涉及两个 AWS 账户：**账户 A** 指包含您的 Lambda 函数的账户，而**账户 B** 指包含 Amazon SQS 队列的账户。

## 先决条件
<a name="with-sqs-cross-account-prepare"></a>

### 安装 AWS Command Line Interface
<a name="install_aws_cli"></a>

如果您尚未安装 AWS Command Line Interface，请按照[安装或更新最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) 中的步骤进行安装。

本教程需要命令行终端或 Shell 来运行命令。在 Linux 和 macOS 中，可使用您首选的 Shell 和程序包管理器。

**注意**  
在 Windows 中，操作系统的内置终端不支持您经常与 Lambda 一起使用的某些 Bash CLI 命令（例如 `zip`）。[安装 Windows Subsystem for Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10)，获取 Ubuntu 和 Bash 与 Windows 集成的版本。

## 创建执行角色（账户 A）
<a name="with-sqs-cross-account-create-execution-role"></a>

在**账户 A** 中，创建一个[执行角色](lambda-intro-execution-role.md)，该角色向您的函数授予访问所需 AWS 资源的权限。

**创建执行角色**

1. 在 AWS Identity and Access Management IAM 控制台中，打开 [Roles (角色) 页面](https://console.aws.amazon.com/iam/home#/roles)。

1. 选择 **Create role**（创建角色）。

1. 创建具有以下属性的角色。
   + **Trusted entity (可信任的实体)** – **AWS Lambda**
   + **Permissions（权限）** – **AWSLambdaSQSQueueExecutionRole**
   + **Role name**（角色名称）– **cross-account-lambda-sqs-role**

**AWSLambdaSQSQueueExecutionRole** 策略具有该函数从 Amazon SQS 中读取项目并将日志写入 Amazon CloudWatch Logs 所需的权限。

## 创建函数（账户 A）
<a name="with-sqs-cross-account-create-function"></a>

在**账户 A** 中，创建一个处理您 Amazon SQS 消息的 Lambda 函数。Lambda 函数和 Amazon SQS 队列必须位于同一 AWS 区域。

下列 Node.js 代码示例将每条消息写入 CloudWatch Logs 中的日志。

**Example index.mjs**  

```
export const handler = async function(event, context) {
  event.Records.forEach(record => {
    const { body } = record;
    console.log(body);
  });
  return {};
}
```

**创建函数**
**注意**  
按照这些步骤在将创建一个 Node.js 函数。对于其他语言，步骤类似，但有些细节不同。

1. 将代码示例保存为名为 `index.mjs` 的文件。

1. 创建部署程序包。

   ```
   zip function.zip index.mjs
   ```

1. 使用 `create-function` AWS Command Line Interface (AWS CLI) 命令创建函数。将 `arn:aws:iam::111122223333:role/cross-account-lambda-sqs-role` 替换为您之前创建的执行角色的 ARN。

   ```
   aws lambda create-function --function-name CrossAccountSQSExample \
   --zip-file fileb://function.zip --handler index.handler --runtime nodejs24.x \
   --role arn:aws:iam::111122223333:role/cross-account-lambda-sqs-role
   ```

## 测试函数（账户 A）
<a name="with-sqs-cross-account-create-test-function"></a>

在**账户 A** 中，使用 `invoke` AWS CLI 命令和示例 Amazon SQS 事件手动调用 Lambda 函数。

如果该处理程序正常返回并且没有异常，则 Lambda 认为该消息得到成功处理并开始读取队列中的新消息。成功处理消息后，Lambda 从队列中将其自动删除。如果该处理程序引发异常，则 Lambda 认为消息的批量处理并未成功进行，并且 Lambda 会用相同的批量消息调用该函数。

1. 将下列 JSON 保存为名为 `input.txt` 的文件。

   ```
   {
       "Records": [
           {
               "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
               "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
               "body": "test",
               "attributes": {
                   "ApproximateReceiveCount": "1",
                   "SentTimestamp": "1545082649183",
                   "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                   "ApproximateFirstReceiveTimestamp": "1545082649185"
               },
               "messageAttributes": {},
               "md5OfBody": "098f6bcd4621d373cade4e832627b4f6",
               "eventSource": "aws:sqs",
               "eventSourceARN": "arn:aws:sqs:us-east-1:111122223333:example-queue",
               "awsRegion": "us-east-1"
           }
       ]
   }
   ```

   上述 JSON 模拟 Amazon SQS 可能发送到您的 Lambda 函数的事件，其中 `"body"` 包含来自该队列的实际消息。

1. 运行以下 `invoke` AWS CLI 命令。

   ```
   aws lambda invoke --function-name CrossAccountSQSExample \
   --cli-binary-format raw-in-base64-out \
   --payload file://input.txt outputfile.txt
   ```

   如果使用 **cli-binary-format** 版本 2，则 AWS CLI 选项是必需的。要将其设为默认设置，请运行 `aws configure set cli-binary-format raw-in-base64-out`。有关更多信息，请参阅*版本 2 的 AWS Command Line Interface 用户指南*中的 [AWS CLI 支持的全局命令行选项](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list)。

1. 在 `outputfile.txt` 文件中验证输出。

## 创建 Amazon SQS 队列（账户 B）。
<a name="with-sqs-cross-account-configure-sqs"></a>

在**账户 B** 中，创建一个可由**账户 A** 中 Lambda 函数用作事件源的 Amazon SQS 队列。Lambda 函数和 Amazon SQS 队列必须位于同一 AWS 区域。

**创建队列**

1. 打开 [Amazon SQS 控制台](https://console.aws.amazon.com/sqs)。

1. 选择**创建队列**。

1. 创建具有以下属性的队列。
   + **类型** - **标准**
   + **名称** - **LambdaCrossAccountQueue**
   + **配置** - 保留默认设置。
   + **访问策略** - 选择 **Advanced (高级)**。粘贴以下 JSON 策略。替换以下值：
     + `111122223333`：**账户 A** 的 AWS 账户 ID
     + `444455556666`：**账户 B** 的 AWS 账户 ID

------
#### [ JSON ]

****  

     ```
     {
         "Version":"2012-10-17",		 	 	 
         "Id": "Queue1_Policy_UUID",
         "Statement": [
             {
                 "Sid": "Queue1_AllActions",
                 "Effect": "Allow",
                 "Principal": {
                     "AWS": [
                         "arn:aws:iam::111122223333:role/cross-account-lambda-sqs-role"
                     ]
                 },
                 "Action": "sqs:*",
                 "Resource": "arn:aws:sqs:us-east-1:444455556666:LambdaCrossAccountQueue"
             }
         ]
     }
     ```

------

     此策略授予**账户 A** 中 Lambda 执行角色权限，以使用 Amazon SQS 队列中的消息。

1. 创建队列后，记录其 Amazon Resource Name (ARN)。在下一步中将该队列与您的 Lambda 函数关联时，您将需要此类信息。

## 配置事件源（帐户 A）
<a name="with-sqs-cross-account-event-source"></a>

在**账户 A** 中，通过运行以下 `create-event-source-mapping` AWS CLI 命令创建**账户 B** 中 Amazon SQS 队列和 Lambda 函数之间的事件源映射。将 `arn:aws:sqs:us-east-1:444455556666:LambdaCrossAccountQueue` 替换为在上一步中创建的 Amazon SQS 队列的 ARN。

```
aws lambda create-event-source-mapping --function-name CrossAccountSQSExample --batch-size 10 \
--event-source-arn arn:aws:sqs:us-east-1:444455556666:LambdaCrossAccountQueue
```

要获取您的事件源映射的列表，请运行下列命令。

```
aws lambda list-event-source-mappings --function-name CrossAccountSQSExample \
--event-source-arn arn:aws:sqs:us-east-1:444455556666:LambdaCrossAccountQueue
```

## 测试设置
<a name="with-sqs-final-integration-test-no-iam"></a>

现在，可以按以下方式测试设置：

1. 在**账户 B** 中，打开[Amazon SQS 控制台](https://console.aws.amazon.com/sqs)。

1. 选择之前创建的 **LambdaCrossAccountQueue**。

1. 选择 **Send and receive messages（发送和接收消息）**。

1. 在 **Message body (消息正文)** 中，输入测试消息。

1. 选择 **Send message**（发送消息）。

您在**账户 A** 中的 Lambda 函数应该收到消息。Lambda 将继续轮询队列以获取更新。当有新消息时，Lambda 会使用该新的事件数据从队列中调用您的函数。您的函数运行并在 Amazon CloudWatch 中创建日志。您可以在 [CloudWatch 控制台](https://console.aws.amazon.com/cloudwatch)中查看这些日志。

## 清除资源
<a name="cleanup"></a>

除非您想要保留为本教程创建的资源，否则可立即将其删除。通过删除您不再使用的 AWS 资源，可防止您的 AWS 账户 产生不必要的费用。

在**账户 A** 中，清理您的执行角色和 Lambda 函数。

**删除执行角色**

1. 打开 IAM 控制台的[角色页面](https://console.aws.amazon.com/iam/home#/roles)。

1. 选择您创建的执行角色。

1. 选择**删除**。

1. 在文本输入字段中输入角色名称，然后选择**删除**。

**删除 Lambda 函数**

1. 打开 Lamba 控制台的 [Functions（函数）页面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 选择您创建的函数。

1. 依次选择**操作**和**删除**。

1. 在文本输入字段中键入 **confirm**，然后选择 **Delete**（删除）。

在**账户 B** 中，清理 Amazon SQS 队列。

**删除 Amazon SQS 队列**

1. 登录到 AWS 管理控制台 并打开 Amazon SQS 控制台，网址：[https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/sqs/)。

1. 选择创建的队列。

1. 选择**删除**。

1. 在文本输入字段中输入 **confirm**。

1. 选择**删除**。