本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将 Amazon SQS 的自动请求批处理与 AWS SDK for Java 2.x
Amazon SQS 的自动请求批处理 API 是一个高级库,它提供了一种有效的方法来批处理和缓冲 SQS 操作的请求。通过使用批处理 API,您可以减少对 SQS 的请求数量,从而提高吞吐量并最大限度地降低成本。
由于批处理 API 方法与方法(sendMessage、、、changeMessageVisibilitydeleteMessage、receiveMessage)相匹配,因此您可以将批处理 API 用作直接替代SqsAsyncClient方法,只需进行最少的更改。
本主题概述了如何配置和使用适用于 Amazon SQS 的自动请求批处理 API。
检查先决条件
您需要使用适用于 Java 2.x 的 SDK 2.28.0 或更高版本才能访问批处理 API。你的 Maven 至少pom.xml应该包含以下元素。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.28.231</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>
1 最新版本
创建批处理管理器
自动请求批处理 API 由SqsAsyncBatchManager
使用默认配置 SqsAsyncClient
创建批处理管理器的最简单方法是在现有SqsAsyncClientbatchManager工厂方法。以下片段显示了简单的方法。
SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();
当您使用这种方法时,SqsAsyncBatchManager实例将使用该覆盖的配置设置 SqsAsyncBatchManager部分表格中显示的默认值。此外,该SqsAsyncBatchManager实例使用从中创建它的SqsAsyncClient实例的。ExecutorService
使用自定义配置 SqsAsyncBatchManager.Builder
对于更高级的用例,您可以使用自定义批处理管理器SqsAsyncBatchManager.BuilderSqsAsyncBatchManager实例,您可以微调批处理行为。以下代码段显示了如何使用构建器自定义批处理行为的示例。
SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(1)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();
使用这种方法时,您可以调整该覆盖的配置设置 SqsAsyncBatchManager部分表格中显示的BatchOverrideConfiguration对象的设置。您也可以使用这种方法ScheduledExecutorService
发送消息
要使用批处理管理器发送消息,请使用SqsAsyncBatchManager#sendMessage方法。SDK 会缓冲请求,并在达到maxBatchSize或sendRequestFrequency值时将其作为批量发送。
以下示例显示了sendMessage紧随其后的是另一个请求的请求。在这种情况下,SDK 将同时发送两条消息。
// Sending the first message CompletableFuture<SendMessageResponse> futureOne = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("One").queueUrl("queue")); // Sending the second message CompletableFuture<SendMessageResponse> futureTwo = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("Two").queueUrl("queue")); // Waiting for both futures to complete and retrieving the responses SendMessageResponse messageOne = futureOne.join(); SendMessageResponse messageTwo = futureTwo.join();
更改消息可见性超时
您可以使用SqsAsyncBatchManager#changeMessageVisibilitymaxBatchSize或sendRequestFrequency值时将其作为批量发送。
以下示例说明如何调用该changeMessageVisibility方法。
CompletableFuture<ChangeMessageVisibilityResponse> futureOne = sqsAsyncBatchManager.changeMessageVisibility(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); ChangeMessageVisibilityResponse response = futureOne.join();
删除消息
您可以使用SqsAsyncBatchManager#deleteMessagemaxBatchSize或sendRequestFrequency值时将其作为批量发送。
以下示例显示了如何调用该deleteMessage方法。
CompletableFuture<DeleteMessageResponse> futureOne = sqsAsyncBatchManager.deleteMessage(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); DeleteMessageResponse response = futureOne.join();
接收消息
使用默认设置
当您在应用程序中轮询该SqsAsyncBatchManager#receiveMessage方法时,批处理管理器会从其内部缓冲区获取消息,SDK 会在后台自动更新这些消息。
以下示例说明如何调用该receiveMessage方法。
CompletableFuture<ReceiveMessageResponse> responseFuture = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl"));
使用自定义设置
如果您想进一步自定义请求,例如通过设置自定义等待时间和指定要检索的消息数量,则可以自定义请求,如以下示例所示。
CompletableFuture<ReceiveMessageResponse> response = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl") .waitTimeSeconds(5) .visibilityTimeout(20));
注意
如果您receiveMessage使用ReceiveMessageRequest包含以下任何参数的调用,则 SDK 会绕过批处理管理器并发送常规异步receiveMessage请求:
-
messageAttributeNames -
messageSystemAttributeNames -
messageSystemAttributeNamesWithStrings -
overrideConfiguration
覆盖的配置设置 SqsAsyncBatchManager
创建SqsAsyncBatchManager实例时,您可以调整以下设置。上提供了以下设置列表BatchOverrideConfiguration.Builder
| 设置 | 描述 | 默认值 |
|---|---|---|
maxBatchSize |
SendMessageBatchRequest、ChangeMessageVisibilityBatchRequest或每批次的最大请求数DeleteMessageBatchRequest。最大值为 10。 |
10 |
sendRequestFrequency |
发送批次之前的时间, |
200 毫秒 |
receiveMessageVisibilityTimeout |
消息的可见性超时。如果未设置,则使用队列的默认值。 | 队列的默认值 |
receiveMessageMinWaitDuration |
receiveMessage请求的最短等待时间。避免设置为 0 以防浪费 CPU。 |
50 毫秒 |
receiveMessageSystemAttributeNames |
请求receiveMessage呼叫的系统属性名称 |
无 |
receiveMessageAttributeNames |
请求receiveMessage呼叫的属性名称列表。 |
无 |