翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
で Amazon SQS の自動リクエストバッチ処理を使用する AWS SDK for Java 2.x
Amazon SQS の自動リクエストバッチ処理 API は、SQS オペレーションのリクエストをバッチ処理およびバッファ処理する効率的な方法を提供する高レベルライブラリです。バッチ処理 API を使用すると、SQS へのリクエストの数を減らすことができ、スループットが向上し、コストが最小限に抑えられます。
バッチ API メソッドは、sendMessage、、 changeMessageVisibilitySqsAsyncClientメソッドと一致するreceiveMessageためdeleteMessage、最小限の変更でドロップイン置換としてバッチ API を使用できます。
このトピックでは、Amazon SQS の自動リクエストバッチ処理 API を設定して使用する方法の概要を説明します。
前提条件をチェックする
バッチ API にアクセスするには、SDK for Java 2.x のバージョン 2.28.0 以降を使用する必要があります。Maven には、少なくとも次の要素が含まれているpom.xml必要があります。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.28.231</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>
1 最新バージョン
バッチマネージャーを作成する
自動リクエストバッチ API は、SqsAsyncBatchManager
を使用したデフォルト設定 SqsAsyncClient
バッチマネージャーを作成する最も簡単な方法は、既存の SqsAsyncClientbatchManagerファクトリメソッドを呼び出すことです。簡単なアプローチを次のスニペットに示します。
SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();
このアプローチを使用すると、SqsAsyncBatchManagerインスタンスは の設定を上書きする SqsAsyncBatchManagerセクションの表に示されているデフォルト値を使用します。さらに、SqsAsyncBatchManagerインスタンスは、作成されたSqsAsyncClientインスタンスExecutorServiceの を使用します。
を使用したカスタム設定 SqsAsyncBatchManager.Builder
より高度なユースケースでは、 を使用してバッチマネージャーをカスタマイズできますSqsAsyncBatchManager.BuilderSqsAsyncBatchManagerインスタンスを作成すると、バッチ処理の動作を微調整できます。次のスニペットは、ビルダーを使用してバッチ処理の動作をカスタマイズする方法の例を示しています。
SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(1)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();
このアプローチを使用すると、 の設定を上書きする SqsAsyncBatchManagerセクションの表に表示される BatchOverrideConfiguration オブジェクトの設定を調整できます。このアプローチを使用して、バッチマネージャーScheduledExecutorService
メッセージを送信する
バッチマネージャーでメッセージを送信するには、 SqsAsyncBatchManager#sendMessageメソッドを使用します。SDK はリクエストをバッファし、 maxBatchSizeまたは sendRequestFrequencyの値に達するとバッチとして送信します。
次の例は、別のsendMessageリクエストの直後のリクエストを示しています。この場合、SDK は両方のメッセージを 1 つのバッチで送信します。
// Sending the first message CompletableFuture<SendMessageResponse> futureOne = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("One").queueUrl("queue")); // Sending the second message CompletableFuture<SendMessageResponse> futureTwo = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("Two").queueUrl("queue")); // Waiting for both futures to complete and retrieving the responses SendMessageResponse messageOne = futureOne.join(); SendMessageResponse messageTwo = futureTwo.join();
メッセージの可視性タイムアウトを変更する
SqsAsyncBatchManager#changeMessageVisibilitymaxBatchSizeまたは sendRequestFrequencyの値に達するとバッチとして送信します。
次の例は、 changeMessageVisibilityメソッドを呼び出す方法を示しています。
CompletableFuture<ChangeMessageVisibilityResponse> futureOne = sqsAsyncBatchManager.changeMessageVisibility(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); ChangeMessageVisibilityResponse response = futureOne.join();
メッセージの削除
SqsAsyncBatchManager#deleteMessagemaxBatchSizeまたは sendRequestFrequencyの値に達するとバッチとして送信します。
次の例は、 deleteMessageメソッドを呼び出す方法を示しています。
CompletableFuture<DeleteMessageResponse> futureOne = sqsAsyncBatchManager.deleteMessage(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); DeleteMessageResponse response = futureOne.join();
メッセージを受信する
デフォルト設定を使用する
アプリケーションで SqsAsyncBatchManager#receiveMessageメソッドをポーリングすると、バッチマネージャーは内部バッファからメッセージを取得し、SDK は自動的にバックグラウンドで更新します。
次の例は、 receiveMessageメソッドを呼び出す方法を示しています。
CompletableFuture<ReceiveMessageResponse> responseFuture = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl"));
カスタム設定を使用する
例えば、カスタム待機時間を設定し、取得するメッセージの数を指定してリクエストをさらにカスタマイズする場合は、次の例に示すようにリクエストをカスタマイズできます。
CompletableFuture<ReceiveMessageResponse> response = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl") .waitTimeSeconds(5) .visibilityTimeout(20));
注記
次のいずれかのパラメータReceiveMessageRequestを含む receiveMessageで を呼び出すと、SDK はバッチマネージャーをバイパスし、通常の非同期receiveMessageリクエストを送信します。
-
messageAttributeNames -
messageSystemAttributeNames -
messageSystemAttributeNamesWithStrings -
overrideConfiguration
の設定を上書きする SqsAsyncBatchManager
SqsAsyncBatchManager インスタンスを作成するときに、次の設定を調整できます。次の設定のリストは、 で利用できますBatchOverrideConfiguration.Builder
| 設定 | 説明 | デフォルト値 |
|---|---|---|
maxBatchSize |
各 、SendMessageBatchRequest、ChangeMessageVisibilityBatchRequestまたは のバッチあたりのリクエストの最大数DeleteMessageBatchRequest。最大値は 10 です。 |
10 |
sendRequestFrequency |
|
200 ミリ秒 |
receiveMessageVisibilityTimeout |
メッセージの可視性タイムアウト。設定されていない場合は、キューのデフォルトが使用されます。 | キューのデフォルト |
receiveMessageMinWaitDuration |
receiveMessage リクエストの最小待機時間。CPU の無駄を防ぐため、 を 0 に設定しないでください。 |
50 ミリ秒 |
receiveMessageSystemAttributeNames |
receiveMessage 呼び出しをリクエストするシステム属性名 |
なし |
receiveMessageAttributeNames |
receiveMessage 呼び出しをリクエストする属性名のリスト。 |
なし |