Amazon SQS 请求批处理从版本 1 到版本 2 的更改 - AWS SDK for Java 2.x

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon SQS 请求批处理从版本 1 到版本 2 的更改

本主题详细介绍了适用于 Java 的 AWS SDK 版本 1 和版本 2 之间的 Amazon SQS 自动请求批处理的更改。

高级别更改

适用于 Java 的 AWS SDK 1.x 使用单独的 AmazonSQSBufferedAsyncClient 类执行客户端缓冲,该类需要显式初始化才能进行请求批处理。

AWS SDK for Java 2.x 使用 SqsAsyncBatchManager 简化并增强了缓冲功能。该接口的实现提供了直接与标准 SqsAsyncClient 集成的自动请求批处理功能。要了解 v2 的 SqsAsyncBatchManager,请参阅本指南中的 通过AWS SDK for Java 2.x 为 Amazon SQS 使用自动请求批处理 主题。

更改 v1 v2

Maven 依赖项

<dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-bom</artifactId> <version>1.12.7821</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> </dependency> </dependencies>
<dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.31.152</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> </dependencies>
软件包名称 com.amazonaws.services.sqs.buffered software.amazon.awssdk.services.sqs.batchmanager
类名

AmazonSQSBufferedAsyncClient

SqsAsyncBatchManager

1 最新版本2 最新版本

使用 SQS 自动请求批处理

更改 v1 v2
创建批量管理器
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();
使用自定义配置创建批量管理器
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); QueueBufferConfig queueBufferConfig = new QueueBufferConfig() .withMaxBatchOpenMs(200) .withMaxBatchSize(10) .withMinReceiveWaitTimeMs(1000) .withVisibilityTimeoutSeconds(20) .withReceiveMessageAttributeNames(messageAttributeValues); AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, queueBufferConfig);
BatchOverrideConfiguration batchOverrideConfiguration = BatchOverrideConfiguration.builder() .sendRequestFrequency(Duration.ofMillis(200)) .maxBatchSize(10) .receiveMessageMinWaitDuration(Duration.ofMillis(1000)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) .receiveMessageSystemAttributeNames(messageSystemAttributeNames) .receiveMessageAttributeNames(messageAttributeValues) .build(); SqsAsyncBatchManager sqsAsyncBatchManager = SqsAsyncBatchManager.builder() .overrideConfiguration(batchOverrideConfiguration) .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(8)) .build();
发送消息
Future<SendMessageResult> sendResultFuture = bufferedSqs.sendMessageAsync(new SendMessageRequest() .withQueueUrl(queueUrl) .withMessageBody(body));
CompletableFuture<SendMessageResponse> sendCompletableFuture = sqsAsyncBatchManager.sendMessage( SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(body) .build());
删除消息
Future<DeleteMessageResult> deletResultFuture = bufferedSqs.deleteMessageAsync(new DeleteMessageRequest() .withQueueUrl(queueUrl));
CompletableFuture<DeleteMessageResponse> deleteResultCompletableFuture = sqsAsyncBatchManager.deleteMessage( DeleteMessageRequest.builder() .queueUrl(queueUrl) .build());
更改消息的可见性
Future<ChangeMessageVisibilityResult> changeVisibilityResultFuture = bufferedSqs.changeMessageVisibilityAsync (new ChangeMessageVisibilityRequest() .withQueueUrl(queueUrl) .withVisibilityTimeout(20));
CompletableFuture<ChangeMessageVisibilityResponse> changeResponseCompletableFuture = sqsAsyncBatchManager.changeMessageVisibility( ChangeMessageVisibilityRequest.builder() .queueUrl(queueUrl) .visibilityTimeout(20) .build());
接收消息
ReceiveMessageResult receiveResult = bufferedSqs.receiveMessage( new ReceiveMessageRequest() .withQueueUrl(queueUrl));
CompletableFuture<ReceiveMessageResponse> responseCompletableFuture = sqsAsyncBatchManager.receiveMessage( ReceiveMessageRequest.builder() .queueUrl(queueUrl) .build());

异步返回类型差异

更改 v1 v2
返回类型 Future<ResultType> CompletableFuture<ResponseType>
回调机制 使用单独的 onSuccessonError 方法需要 AsyncHandler 使用 JDK 提供的 CompletableFuture API,例如 whenComplete()thenCompose()thenApply()
异常处理 使用 AsyncHandler#onError() 方法 使用 JDK 提供的 CompletableFuture API,例如 exceptionally()handle()whenComplete()
取消 通过 Future.cancel() 实现基本支持 取消父 CompletableFuture 会自动取消链中的所有相关 Future

异步完成处理差异

更改 v1 v2
响应处理程序实现
Future<ReceiveMessageResult> future = bufferedSqs.receiveMessageAsync( receiveRequest, new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() { @Override public void onSuccess(ReceiveMessageRequest request, ReceiveMessageResult result) { List<Message> messages = result.getMessages(); System.out.println("Received " + messages.size() + " messages"); for (Message message : messages) { System.out.println("Message ID: " + message.getMessageId()); System.out.println("Body: " + message.getBody()); } } @Override public void onError(Exception e) { System.err.println("Error receiving messages: " + e.getMessage()); e.printStackTrace(); } } );
CompletableFuture<ReceiveMessageResponse> completableFuture = sqsAsyncBatchManager .receiveMessage(ReceiveMessageRequest.builder() .queueUrl(queueUrl).build()) .whenComplete((receiveMessageResponse, throwable) -> { if (throwable != null) { System.err.println("Error receiving messages: " + throwable.getMessage()); throwable.printStackTrace(); } else { List<Message> messages = receiveMessageResponse.messages(); System.out.println("Received " + messages.size() + " messages"); for (Message message : messages) { System.out.println("Message ID: " + message.messageId()); System.out.println("Body: " + message.body()); } } });

关键配置参数

参数 v1 v2
最大批次大小 maxBatchSize(每批默认为 10 个请求) maxBatchSize(每批默认为 10 个请求)
批次等待时间 maxBatchOpenMs(默认值为 200 毫秒) sendRequestFrequency(默认值为 200 毫秒)
可见性超时 visibilityTimeoutSeconds(队列默认值为 -1) receiveMessageVisibilityTimeout(队列默认值)
最小等待时间 longPollWaitTimeoutSeconds(如果 longPoll 为 true,则为 20 秒) receiveMessageMinWaitDuration(默认值为 50 毫秒)
消息属性 使用 ReceiveMessageRequest 进行设置 receiveMessageAttributeNames(默认值为无)
系统属性 使用 ReceiveMessageRequest 进行设置 receiveMessageSystemAttributeNames(默认值为无)
长轮询 longPoll(默认值为 true) 不支持,以便避免让已建立的连接一直处于等待状态,直到服务器发送消息
长时间轮询的最长等待时间 longPollWaitTimeoutSeconds(默认值为 20 秒) 不支持,以便避免让已建立的连接一直处于等待状态,直到服务器发送消息
在客户端预取和存储接收批次的最大数量 maxDoneReceiveBatches(10 个批次) 不支持,因为这在内部处理
同时处理的最大活动出站批次数量 maxInflightOutboundBatches(默认为 5 个批次) 不支持,因为这在内部处理
同时处理的最大活动接收批次数量 maxInflightReceiveBatches(默认为 10 个批次) 不支持,因为这在内部处理