バージョン 1 からバージョン 2 での Amazon SQS リクエストの自動バッチ処理の変更 - AWS SDK for Java 2.x

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

バージョン 1 からバージョン 2 での Amazon SQS リクエストの自動バッチ処理の変更

このトピックでは、AWS SDK for Java のバージョン 1 とバージョン 2 での Amazon SQS の自動リクエストバッチ処理の変更について詳しく説明します。

高レベル変更

AWS SDK for Java 1.x は、リクエストバッチ処理に明示的な初期化を必要とする別の AmazonSQSBufferedAsyncClient クラスを使用してクライアント側のバッファリングを実行します。

AWS SDK for Java 2.x は、SqsAsyncBatchManager を使用してバッファリング機能を簡素化および強化します。このインターフェイスの実装により、標準の SqsAsyncClient と直接統合された自動リクエストバッチ処理機能が提供されます。v2 の SqsAsyncBatchManager の詳細については、このガイドの AWS SDK for Java 2.x で Amazon SQS の自動リクエストバッチ処理を使用する トピックを参照してください。

変更 v1 v2

Maven の依存関係

<dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-bom</artifactId> <version>1.12.7821</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> </dependency> </dependencies>
<dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.31.152</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> </dependencies>
パッケージ名 com.amazonaws.services.sqs.buffered software.amazon.awssdk.services.sqs.batchmanager
クラス名

AmazonSQSBufferedAsyncClient

SqsAsyncBatchManager

1 最新バージョン2 最新バージョン

自動 SQS リクエストバッチ処理の使用

変更 v1 v2
バッチマネージャーを作成する
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();
カスタム設定でバッチマネージャーを作成する
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); QueueBufferConfig queueBufferConfig = new QueueBufferConfig() .withMaxBatchOpenMs(200) .withMaxBatchSize(10) .withMinReceiveWaitTimeMs(1000) .withVisibilityTimeoutSeconds(20) .withReceiveMessageAttributeNames(messageAttributeValues); AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, queueBufferConfig);
BatchOverrideConfiguration batchOverrideConfiguration = BatchOverrideConfiguration.builder() .sendRequestFrequency(Duration.ofMillis(200)) .maxBatchSize(10) .receiveMessageMinWaitDuration(Duration.ofMillis(1000)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) .receiveMessageSystemAttributeNames(messageSystemAttributeNames) .receiveMessageAttributeNames(messageAttributeValues) .build(); SqsAsyncBatchManager sqsAsyncBatchManager = SqsAsyncBatchManager.builder() .overrideConfiguration(batchOverrideConfiguration) .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(8)) .build();
メッセージを送信する
Future<SendMessageResult> sendResultFuture = bufferedSqs.sendMessageAsync(new SendMessageRequest() .withQueueUrl(queueUrl) .withMessageBody(body));
CompletableFuture<SendMessageResponse> sendCompletableFuture = sqsAsyncBatchManager.sendMessage( SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(body) .build());
メッセージの削除
Future<DeleteMessageResult> deletResultFuture = bufferedSqs.deleteMessageAsync(new DeleteMessageRequest() .withQueueUrl(queueUrl));
CompletableFuture<DeleteMessageResponse> deleteResultCompletableFuture = sqsAsyncBatchManager.deleteMessage( DeleteMessageRequest.builder() .queueUrl(queueUrl) .build());
メッセージの可視性を変更する
Future<ChangeMessageVisibilityResult> changeVisibilityResultFuture = bufferedSqs.changeMessageVisibilityAsync (new ChangeMessageVisibilityRequest() .withQueueUrl(queueUrl) .withVisibilityTimeout(20));
CompletableFuture<ChangeMessageVisibilityResponse> changeResponseCompletableFuture = sqsAsyncBatchManager.changeMessageVisibility( ChangeMessageVisibilityRequest.builder() .queueUrl(queueUrl) .visibilityTimeout(20) .build());
メッセージを受信する
ReceiveMessageResult receiveResult = bufferedSqs.receiveMessage( new ReceiveMessageRequest() .withQueueUrl(queueUrl));
CompletableFuture<ReceiveMessageResponse> responseCompletableFuture = sqsAsyncBatchManager.receiveMessage( ReceiveMessageRequest.builder() .queueUrl(queueUrl) .build());

非同期戻り値の型の違い

変更 v1 v2
戻り型 Future<ResultType> CompletableFuture<ResponseType>
コールバックメカニズム 個別の onSuccess メソッドと onError メソッドを持つ AsyncHandler が必要です whenComplete()thenCompose()thenApply() など、JDK が提供する CompletableFuture API を使用します。
例外処理 AsyncHandler#onError() メソッドを使用する exceptionally()handle()whenComplete() など、JDK が提供する CompletableFuture API を使用します。
キャンセル Future.cancel() による基本的なサポート 親の CompletableFuture をキャンセルすると、チェーン内のすべての依存する Future が自動的にキャンセルされます。

非同期完了処理の違い

変更 v1 v2
レスポンスハンドラーの実装
Future<ReceiveMessageResult> future = bufferedSqs.receiveMessageAsync( receiveRequest, new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() { @Override public void onSuccess(ReceiveMessageRequest request, ReceiveMessageResult result) { List<Message> messages = result.getMessages(); System.out.println("Received " + messages.size() + " messages"); for (Message message : messages) { System.out.println("Message ID: " + message.getMessageId()); System.out.println("Body: " + message.getBody()); } } @Override public void onError(Exception e) { System.err.println("Error receiving messages: " + e.getMessage()); e.printStackTrace(); } } );
CompletableFuture<ReceiveMessageResponse> completableFuture = sqsAsyncBatchManager .receiveMessage(ReceiveMessageRequest.builder() .queueUrl(queueUrl).build()) .whenComplete((receiveMessageResponse, throwable) -> { if (throwable != null) { System.err.println("Error receiving messages: " + throwable.getMessage()); throwable.printStackTrace(); } else { List<Message> messages = receiveMessageResponse.messages(); System.out.println("Received " + messages.size() + " messages"); for (Message message : messages) { System.out.println("Message ID: " + message.messageId()); System.out.println("Body: " + message.body()); } } });

主な設定パラメータ

パラメータ v1 v2
最大バッチサイズ maxBatchSize (デフォルトはバッチあたり 10 リクエスト) maxBatchSize (デフォルトはバッチあたり 10 リクエスト)
バッチ待機時間 maxBatchOpenMs (デフォルトは 200 ミリ秒) sendRequestFrequency (デフォルトは 200 ミリ秒)
可視性タイムアウト visibilityTimeoutSeconds (キューのデフォルトは -1) receiveMessageVisibilityTimeout (キューのデフォルト)
最小待機時間 longPollWaitTimeoutSeconds (longPoll が true の場合は 20 秒) receiveMessageMinWaitDuration (デフォルトは 50 ミリ秒)
メッセージ属性 ReceiveMessageRequest を使用して設定する receiveMessageAttributeNames (デフォルトではなし)
システム属性 ReceiveMessageRequest を使用して設定する receiveMessageSystemAttributeNames (デフォルトではなし)
ロングポーリング longPoll (デフォルトは true) サーバーがメッセージを送信するまでオープン接続が待機しないように、サポートされていません
ロングポーリングの最大待機時間 longPollWaitTimeoutSeconds (デフォルトは 20 秒) サーバーがメッセージを送信するまでオープン接続が待機しないように、サポートされていません
クライアント側に保存する、プリフェッチされる受信バッチの最大数。 maxDoneReceiveBatches (10 バッチ) 内部で処理されるため、サポートされていません
同時に処理されるアクティブな送信バッチの最大数 maxInflightOutboundBatches (デフォルトは 5 バッチ) 内部で処理されるため、サポートされていません
同時に処理されるアクティブな受信バッチの最大数 maxInflightReceiveBatches (デフォルトは 10 バッチ) 内部で処理されるため、サポートされていません