本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon SQS 请求批处理从版本 1 到版本 2 的更改
本主题详细介绍了适用于 Java 的 AWS SDK 版本 1 和版本 2 之间的 Amazon SQS 自动请求批处理的更改。
高级别更改
适用于 Java 的 AWS SDK 1.x 使用单独的 AmazonSQSBufferedAsyncClient 类执行客户端缓冲,该类需要显式初始化才能进行请求批处理。
AWS SDK for Java 2.x 使用 SqsAsyncBatchManager 简化并增强了缓冲功能。该接口的实现提供了直接与标准 SqsAsyncClient 集成的自动请求批处理功能。要了解 v2 的 SqsAsyncBatchManager,请参阅本指南中的 通过AWS SDK for Java 2.x 为 Amazon SQS 使用自动请求批处理 主题。
| 更改 |
v1 |
v2 |
|
Maven 依赖项
|
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>1.12.7821</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
</dependency>
</dependencies>
|
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.31.152</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>
|
| 软件包名称 |
com.amazonaws.services.sqs.buffered |
software.amazon.awssdk.services.sqs.batchmanager |
| 类名 |
AmazonSQSBufferedAsyncClient
|
SqsAsyncBatchManager |
1 最新版本。2 最新版本。
使用 SQS 自动请求批处理
| 更改 |
v1 |
v2 |
| 创建批量管理器 |
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
AmazonSQSAsync bufferedSqs = new
AmazonSQSBufferedAsyncClient(sqsAsync);
|
SqsAsyncClient asyncClient = SqsAsyncClient.create();
SqsAsyncBatchManager sqsAsyncBatchManager =
asyncClient.batchManager();
|
| 使用自定义配置创建批量管理器 |
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
QueueBufferConfig queueBufferConfig = new QueueBufferConfig()
.withMaxBatchOpenMs(200)
.withMaxBatchSize(10)
.withMinReceiveWaitTimeMs(1000)
.withVisibilityTimeoutSeconds(20)
.withReceiveMessageAttributeNames(messageAttributeValues);
AmazonSQSAsync bufferedSqs =
new AmazonSQSBufferedAsyncClient(sqsAsync, queueBufferConfig);
|
BatchOverrideConfiguration batchOverrideConfiguration =
BatchOverrideConfiguration.builder()
.sendRequestFrequency(Duration.ofMillis(200))
.maxBatchSize(10)
.receiveMessageMinWaitDuration(Duration.ofMillis(1000))
.receiveMessageVisibilityTimeout(Duration.ofSeconds(20))
.receiveMessageSystemAttributeNames(messageSystemAttributeNames)
.receiveMessageAttributeNames(messageAttributeValues)
.build();
SqsAsyncBatchManager sqsAsyncBatchManager = SqsAsyncBatchManager.builder()
.overrideConfiguration(batchOverrideConfiguration)
.client(SqsAsyncClient.create())
.scheduledExecutor(Executors.newScheduledThreadPool(8))
.build();
|
| 发送消息 |
Future<SendMessageResult> sendResultFuture =
bufferedSqs.sendMessageAsync(new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody(body));
|
CompletableFuture<SendMessageResponse> sendCompletableFuture =
sqsAsyncBatchManager.sendMessage(
SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(body)
.build());
|
| 删除消息 |
Future<DeleteMessageResult> deletResultFuture =
bufferedSqs.deleteMessageAsync(new DeleteMessageRequest()
.withQueueUrl(queueUrl));
|
CompletableFuture<DeleteMessageResponse> deleteResultCompletableFuture
= sqsAsyncBatchManager.deleteMessage(
DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.build());
|
| 更改消息的可见性 |
Future<ChangeMessageVisibilityResult> changeVisibilityResultFuture =
bufferedSqs.changeMessageVisibilityAsync
(new ChangeMessageVisibilityRequest()
.withQueueUrl(queueUrl)
.withVisibilityTimeout(20));
|
CompletableFuture<ChangeMessageVisibilityResponse> changeResponseCompletableFuture
= sqsAsyncBatchManager.changeMessageVisibility(
ChangeMessageVisibilityRequest.builder()
.queueUrl(queueUrl)
.visibilityTimeout(20)
.build());
|
| 接收消息 |
ReceiveMessageResult receiveResult =
bufferedSqs.receiveMessage(
new ReceiveMessageRequest()
.withQueueUrl(queueUrl));
|
CompletableFuture<ReceiveMessageResponse>
responseCompletableFuture = sqsAsyncBatchManager.receiveMessage(
ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.build());
|
异步返回类型差异
| 更改 |
v1 |
v2 |
| 返回类型 |
Future<ResultType> |
CompletableFuture<ResponseType> |
| 回调机制 |
使用单独的 onSuccess 和 onError 方法需要 AsyncHandler |
使用 JDK 提供的 CompletableFuture API,例如 whenComplete()、thenCompose()、thenApply() |
| 异常处理 |
使用 AsyncHandler#onError() 方法 |
使用 JDK 提供的 CompletableFuture API,例如 exceptionally()、handle() 或 whenComplete() |
| 取消 |
通过 Future.cancel() 实现基本支持 |
取消父 CompletableFuture 会自动取消链中的所有相关 Future |
异步完成处理差异
| 更改 |
v1 |
v2 |
| 响应处理程序实现 |
Future<ReceiveMessageResult> future = bufferedSqs.receiveMessageAsync(
receiveRequest,
new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() {
@Override
public void onSuccess(ReceiveMessageRequest request,
ReceiveMessageResult result) {
List<Message> messages = result.getMessages();
System.out.println("Received " + messages.size() + " messages");
for (Message message : messages) {
System.out.println("Message ID: " + message.getMessageId());
System.out.println("Body: " + message.getBody());
}
}
@Override
public void onError(Exception e) {
System.err.println("Error receiving messages: " + e.getMessage());
e.printStackTrace();
}
}
);
|
CompletableFuture<ReceiveMessageResponse> completableFuture = sqsAsyncBatchManager
.receiveMessage(ReceiveMessageRequest.builder()
.queueUrl(queueUrl).build())
.whenComplete((receiveMessageResponse, throwable) -> {
if (throwable != null) {
System.err.println("Error receiving messages: " + throwable.getMessage());
throwable.printStackTrace();
} else {
List<Message> messages = receiveMessageResponse.messages();
System.out.println("Received " + messages.size() + " messages");
for (Message message : messages) {
System.out.println("Message ID: " + message.messageId());
System.out.println("Body: " + message.body());
}
}
});
|
关键配置参数
| 参数 |
v1 |
v2 |
| 最大批次大小 |
maxBatchSize(每批默认为 10 个请求) |
maxBatchSize(每批默认为 10 个请求) |
| 批次等待时间 |
maxBatchOpenMs(默认值为 200 毫秒) |
sendRequestFrequency(默认值为 200 毫秒) |
| 可见性超时 |
visibilityTimeoutSeconds(队列默认值为 -1) |
receiveMessageVisibilityTimeout(队列默认值) |
| 最小等待时间 |
longPollWaitTimeoutSeconds(如果 longPoll 为 true,则为 20 秒) |
receiveMessageMinWaitDuration(默认值为 50 毫秒) |
| 消息属性 |
使用 ReceiveMessageRequest 进行设置 |
receiveMessageAttributeNames(默认值为无) |
| 系统属性 |
使用 ReceiveMessageRequest 进行设置 |
receiveMessageSystemAttributeNames(默认值为无) |
| 长轮询 |
longPoll(默认值为 true) |
不支持,以便避免让已建立的连接一直处于等待状态,直到服务器发送消息 |
| 长时间轮询的最长等待时间 |
longPollWaitTimeoutSeconds(默认值为 20 秒) |
不支持,以便避免让已建立的连接一直处于等待状态,直到服务器发送消息 |
| 在客户端预取和存储接收批次的最大数量 |
maxDoneReceiveBatches(10 个批次) |
不支持,因为这在内部处理 |
| 同时处理的最大活动出站批次数量 |
maxInflightOutboundBatches(默认为 5 个批次) |
不支持,因为这在内部处理 |
| 同时处理的最大活动接收批次数量 |
maxInflightReceiveBatches(默认为 10 个批次) |
不支持,因为这在内部处理 |