翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon SQS でのクライアント側のバッファリングとリクエストのバッチ処理の有効化
AWS SDK for JavaAmazonSQSBufferedAsyncClientAmazon SQSにアクセスするもの。このクライアントにより、クライアント側のバッファリングを使用したシンプルなリクエストバッチ処理が可能になります。クライアントから行われた呼び出しは、最初にバッファされ、バッチリクエストとして Amazon SQS に送信されます。
クライアント側のバッファリングは最大10個のリクエストをバッファリングし、バッチリクエストとして送信できるため、Amazon SQSの利用コストを削減して、送信リクエストの数を減らすことができます。バッファリングは同期および非同期コールの両方をAmazonSQSBufferedAsyncClient バッファします。バッチ処理されたリクエストと ロングポーリングのサポートによって、スループットを向上させることもできます。詳細については、「Amazon SQS での水平スケーリングとアクションのバッチ処理を使用したスループットの向上」を参照してください。
AmazonSQSBufferedAsyncClientはAmazonSQSAsyncClientと同じインターフェイスを実行するため、AmazonSQSAsyncClient から AmazonSQSBufferedAsyncClient に移行するには通常既存のコードを最小限変更するだけです。
注記
Amazon SQS バッファリング非同期クライアントは現在 FIFOキューをサポートしていません。
AmazonSQSBufferedAsyncClient の使用
開始する前に、「Amazon SQSのセットアップ」のステップを完了します。
AWS SDK for Java 1.x
AWS SDK for Java 1.x では、次の例AmazonSQSBufferedAsyncClientに基づいて新しい を作成できます。
// Create the basic Amazon SQS async client final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); // Create the buffered client final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
新規作成したらAmazonSQSBufferedAsyncClient、これを使用してAmazon SQSに複数のリクエストを送信できます (AmazonSQSAsyncClient と同様です)。次に例を示します。
final CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyQueue"); final CreateQueueResult res = bufferedSqs.createQueue(createRequest); final SendMessageRequest request = new SendMessageRequest(); final String body = "Your message text" + System.currentTimeMillis(); request.setMessageBody( body ); request.setQueueUrl(res.getQueueUrl()); final Future<SendMessageResult> sendResult = bufferedSqs.sendMessageAsync(request); final ReceiveMessageRequest receiveRq = new ReceiveMessageRequest() .withMaxNumberOfMessages(1) .withQueueUrl(queueUrl); final ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);
AmazonSQSBufferedAsyncClient の設定
AmazonSQSBufferedAsyncClient は、ほとんどのユースケースに合うように事前に設定されています。たとえば、AmazonSQSBufferedAsyncClient をさらに設定できます。'
-
必要な設定パラメータを使用して、
QueueBufferConfigクラスのインスタンスを作成します。 -
AmazonSQSBufferedAsyncClientコンストラクタにインスタンスを指定します。
// Create the basic Amazon SQS async client final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); final QueueBufferConfig config = new QueueBufferConfig() .withMaxInflightReceiveBatches(5) .withMaxDoneReceiveBatches(15); // Create the buffered client final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, config);
| パラメータ | デフォルト値 | 説明 |
|---|---|---|
longPoll |
true |
|
longPollWaitTimeoutSeconds |
20 s |
空の受信結果を返すまでに、キュー内へのメッセージの出現をサーバーが待機するのを 注記ロングポーリングが無効になっている場合、この設定に効果はありません。 |
maxBatchOpenMs |
200ms |
送信呼び出しが、同じタイプのメッセージをバッチ処理する他の呼び出しを待機する最大ミリ秒。 設定を大きくすればするほど、同じ量の処理を実行するのに必要なバッチが少なくなります (ただし、バッチ内の最初の呼び出しは待機時間が長くなります)。 このパラメータを |
maxBatchSize |
バッチあたり 10 個のリクエスト |
1 つのバッチリクエストでまとめてバッチ処理されるメッセージの最大数。設定を大きくするほど、全体数が同じリクエストの処理に要するバッチ数が減ります。 注記バッチあたり 10 個のリクエストはAmazon SQSの最大許容値です。 |
maxBatchSizeBytes |
1 MiB |
クライアントがAmazon SQSに送信しようとするメッセージバッチの最大サイズ、バイト単位。 注記1 MiB は Amazon SQS の最大許容値です。 |
maxDoneReceiveBatches |
10 個のバッチ |
設定を大きくすればするほど、Amazon SQSを呼び出さなくても多くの受信リクエストを満たすことができます (ただし、プリフェッチされるメッセージが多くなるほど、バッファにとどまる時間が長くなるため、それ自体の可視性タイムアウトが発生する可能性があります)。 注記
|
maxInflightOutboundBatches |
5 個のバッチ |
同時に処理できるアクティブな送信バッチの最大数。 設定を大きくすればするほど、送信バッチの送信速度が速くなり (CPU や帯域幅などの他のクォータの影響を受けます)、 |
maxInflightReceiveBatches |
10 個のバッチ |
同時に処理できるアクティブな受信バッチの最大数。 設定を大きくすればするほど、受信するメッセージが増え (CPU や帯域幅などの他のクォータの影響を受けます)、 注記
|
visibilityTimeoutSeconds |
-1 |
このパラメータが 0 以外の正の値に設定されている場合、ここで設定した可視性タイムアウトにより、メッセージの処理元のキューで設定された可視性タイムアウトが上書きされます。 注記
可視性タイムアウトを |
AWS SDK for Java 2.x
AWS SDK for Java 2.x では、次の例SqsAsyncBatchManagerに基づいて新しい を作成できます。
// Create the basic Sqs Async Client SqsAsyncClient sqs = SqsAsyncClient.builder() .region(Region.US_EAST_1) .build(); // Create the batch manager SqsAsyncBatchManager sqsAsyncBatchManager = sqs.batchManager();
新規作成したらSqsAsyncBatchManager、これを使用してAmazon SQSに複数のリクエストを送信できます (SqsAsyncClient と同様です)。次に例を示します。
final String queueName = "MyAsyncBufferedQueue" + UUID.randomUUID(); final CreateQueueRequest request = CreateQueueRequest.builder().queueName(queueName).build(); final String queueUrl = sqs.createQueue(request).join().queueUrl(); System.out.println("Queue created: " + queueUrl); // Send messages CompletableFuture<SendMessageResponse> sendMessageFuture; for (int i = 0; i < 10; i++) { final int index = i; sendMessageFuture = sqsAsyncBatchManager.sendMessage( r -> r.messageBody("Message " + index).queueUrl(queueUrl)); SendMessageResponse response= sendMessageFuture.join(); System.out.println("Message " + response.messageId() + " sent!"); } // Receive messages with customized configurations CompletableFuture<ReceiveMessageResponse> receiveResponseFuture = customizedBatchManager.receiveMessage( r -> r.queueUrl(queueUrl) .waitTimeSeconds(10) .visibilityTimeout(20) .maxNumberOfMessages(10) ); System.out.println("You have received " + receiveResponseFuture.join().messages().size() + " messages in total."); // Delete messages DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder().queueUrl(queueUrl).build(); int code = sqs.deleteQueue(deleteQueueRequest).join().sdkHttpResponse().statusCode(); System.out.println("Queue is deleted, with statusCode " + code);
SqsAsyncBatchManager の設定
SqsAsyncBatchManager は、ほとんどのユースケースに合うように事前に設定されています。たとえば、SqsAsyncBatchManager をさらに設定できます。'
を使用したカスタム設定の作成SqsAsyncBatchManager.Builder:
SqsAsyncBatchManager customizedBatchManager = SqsAsyncBatchManager.builder() .client(sqs) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .maxBatchSize(10) .sendRequestFrequency(Duration.ofMillis(200)) .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();
| パラメータ | デフォルト値 | 説明 |
|---|---|---|
maxBatchSize |
バッチあたり 10 個のリクエスト |
1 つのバッチリクエストでまとめてバッチ処理されるメッセージの最大数。設定を大きくするほど、全体数が同じリクエストの処理に要するバッチ数が減ります。 注記Amazon SQS の最大許容値は、バッチあたり 10 リクエストです。 |
sendRequestFrequency |
200ms |
送信呼び出しが、同じタイプのメッセージをバッチ処理する他の呼び出しを待機する最大ミリ秒。 設定を大きくすればするほど、同じ量の処理を実行するのに必要なバッチが少なくなります (ただし、バッチ内の最初の呼び出しは待機時間が長くなります)。 このパラメータを |
receiveMessageVisibilityTimeout |
-1 |
このパラメータが 0 以外の正の値に設定されている場合、ここで設定した可視性タイムアウトにより、メッセージの処理元のキューで設定された可視性タイムアウトが上書きされます。 注記 |
receiveMessageMinWaitDuration |
50 ミリ秒 |
|