通过AWS SDK for Java 2.x 为 Amazon SQS 使用自动请求批处理
Amazon SQS 的自动请求批处理 API 是一个高级库,提供了一种高效的方法来批处理和缓冲 SQS 操作的请求。通过使用批处理 API,您可以减少 SQS 的请求数量,从而提高吞吐量并更大限度地降低成本。
由于批处理 API 方法与 SqsAsyncClient 方法(sendMessage、changeMessageVisibility、deleteMessage、receiveMessage)相匹配,因此您可以将批处理 API 作为直接替代方案使用,只需做极少的代码修改。
本主题概述了如何配置和使用 Amazon SQS 的自动请求批处理 API。
检查先决条件
您需要使用适用于 Java 的 SDK 2.x 的版本 2.28.0 或更高版本才能访问批处理 API。您的 Maven pom.xml 应至少包含以下元素。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.28.231</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>
1 最新版本
创建批量管理器
自动请求批处理 API 由 SqsAsyncBatchManager
使用 SqsAsyncClient 的默认配置
创建批处理管理器的最简单方法是在现有 SqsAsyncClientbatchManager 工厂方法。以下代码段演示了这种简单方法。
SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();
当您使用这种方法时,SqsAsyncBatchManager 实例会使用 覆盖 SqsAsyncBatchManager 的配置设置 部分表格中显示的默认值。此外,SqsAsyncBatchManager 实例会使用其创建来源的 SqsAsyncClient 实例的 ExecutorService。
使用 SqsAsyncBatchManager.Builder 的自定义配置
对于更高级的使用案例,您可以使用 SqsAsyncBatchManager.BuilderSqsAsyncBatchManager 实例,您可以微调批处理行为。以下代码段举例说明了如何使用生成器来自定义批处理行为。
SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(1)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();
使用这种方法时,您可以调整 覆盖 SqsAsyncBatchManager 的配置设置 部分表格中显示的 BatchOverrideConfiguration 对象的设置。您也可以使用这种方法为批处理管理器提供自定义 ScheduledExecutorService
发送消息
要使用批处理管理器发送消息,请使用 SqsAsyncBatchManager#sendMessage 方法。SDK 会缓冲请求,并在达到 maxBatchSize 或 sendRequestFrequency 值时批量发送请求。
以下示例展示了一个 sendMessage 请求后立即紧跟着另一个请求。在这种情况下,SDK 在单个批次中发送这两条消息。
// Sending the first message CompletableFuture<SendMessageResponse> futureOne = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("One").queueUrl("queue")); // Sending the second message CompletableFuture<SendMessageResponse> futureTwo = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("Two").queueUrl("queue")); // Waiting for both futures to complete and retrieving the responses SendMessageResponse messageOne = futureOne.join(); SendMessageResponse messageTwo = futureTwo.join();
更改消息的可见性超时
您可以使用 SqsAsyncBatchManager#changeMessageVisibilitymaxBatchSize 或 sendRequestFrequency 值时批量发送请求。
以下示例显示了如何调用 changeMessageVisibility 方法。
CompletableFuture<ChangeMessageVisibilityResponse> futureOne = sqsAsyncBatchManager.changeMessageVisibility(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); ChangeMessageVisibilityResponse response = futureOne.join();
删除消息
您可以使用 SqsAsyncBatchManager#deleteMessagemaxBatchSize 或 sendRequestFrequency 值时批量发送请求。
以下示例说明了如何调用 deleteMessage 方法。
CompletableFuture<DeleteMessageResponse> futureOne = sqsAsyncBatchManager.deleteMessage(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); DeleteMessageResponse response = futureOne.join();
接收消息
使用默认设置
当您在应用程序中轮询 SqsAsyncBatchManager#receiveMessage 方法时,批处理管理器会从其内部缓冲区获取消息,SDK 会在后台自动更新缓冲区。
以下示例显示了如何调用 receiveMessage 方法。
CompletableFuture<ReceiveMessageResponse> responseFuture = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl"));
使用自定义设置
如果您想进一步自定义请求(例如设置自定义等待时间和指定要检索的消息数量),则可以按以下示例所示自定义请求。
CompletableFuture<ReceiveMessageResponse> response = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl") .waitTimeSeconds(5) .visibilityTimeout(20));
注意
如果您使用包括以下任何参数的 ReceiveMessageRequest 来调用 receiveMessage,则 SDK 会绕过批处理管理器并发送常规异步 receiveMessage 请求:
-
messageAttributeNames -
messageSystemAttributeNames -
messageSystemAttributeNamesWithStrings -
overrideConfiguration
覆盖 SqsAsyncBatchManager 的配置设置
创建 SqsAsyncBatchManager 实例时,可以调整以下设置。BatchOverrideConfiguration.Builder
| 设置 | 描述 | 默认值 |
|---|---|---|
maxBatchSize |
每个 SendMessageBatchRequest、ChangeMessageVisibilityBatchRequest 或 DeleteMessageBatchRequest 每批次的最大请求数。最大值为 10。 |
10 |
sendRequestFrequency |
发送批次之前的时间,除非提前达到 |
200 毫秒 |
receiveMessageVisibilityTimeout |
消息的可见性超时。如果未设置,则使用队列的默认值。 | 队列的默认值 |
receiveMessageMinWaitDuration |
receiveMessage 请求的最短等待时间。避免设置为 0,以防浪费 CPU。 |
50 毫秒 |
receiveMessageSystemAttributeNames |
receiveMessage 调用请求的系统属性名称 |
无 |
receiveMessageAttributeNames |
receiveMessage 调用请求的属性名称列表。 |
无 |