AWS SDK for Java 2.x를 통해 Amazon SQS에서 자동 요청 배치 처리 사용
Amazon SQS용 Automatic Request Batching API는 SQS 작업에 대한 요청을 배치 처리 및 버퍼링하는 효율적인 방법을 제공하는 개괄적인 라이브러리입니다. 배치 처리 API를 사용하면 SQS에 대한 요청 수를 줄여 처리량을 개선하고 비용을 최소화할 수 있습니다.
배치 API 메서드는 SqsAsyncClient 메서드(sendMessage, changeMessageVisibility, deleteMessage, receiveMessage)와 일치하므로 최소한의 변경 사항으로 배치 API를 드롭인 대체로 사용할 수 있습니다.
이 주제에서는 Amazon SQS용 Automatic Request Batching API를 구성하고 사용하는 방법에 대한 개요를 제공합니다.
사전 조건 확인
배치 처리 API에 액세스하려면 SDK for Java 2.x의 버전 2.28.0 이상을 사용해야 합니다. Maven pom.xml에는 최소한 다음 요소가 포함되어야 합니다.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.28.231</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>
1 최신 버전
배치 관리자 만들기
자동 요청 배치 처리 API는 SqsAsyncBatchManager
SqsAsyncClient를 사용한 기본 구성
배치 관리자를 만드는 가장 간단한 방법은 기존 SqsAsyncClientbatchManager 팩토리 메서드를 호출하는 것입니다. 이 접근 방법은 다음 코드 조각에 나와 있습니다.
SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();
이 접근 방식을 사용할 경우 SqsAsyncBatchManager 인스턴스는 SqsAsyncBatchManager에 대한 구성 설정 재정의 섹션의 표에 표시된 기본값을 사용합니다. 또한 SqsAsyncBatchManager 인스턴스는 만들어진 SqsAsyncClient 인스턴스의 ExecutorService를 사용합니다.
SqsAsyncBatchManager.Builder를 사용한 사용자 지정 구성
고급 사용 사례의 경우 SqsAsyncBatchManager.BuilderSqsAsyncBatchManager 인스턴스를 만들면 배치 처리 동작을 미세 조정할 수 있습니다. 다음 코드 조각은 빌더를 사용하여 배치 처리 동작을 사용자 지정하는 방법의 예를 보여줍니다.
SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(1)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();
이 접근 방식을 사용하면 SqsAsyncBatchManager에 대한 구성 설정 재정의 섹션의 표에 표시된 BatchOverrideConfiguration 객체의 설정을 조정할 수 있습니다. 이 접근 방식을 사용하여 배치 관리자에 사용자 지정 ScheduledExecutorService
메시지 전송
배치 관리자로 메시지를 보내려면 SqsAsyncBatchManager#sendMessage 메서드를 사용합니다. SDK는 요청을 버퍼링하고 maxBatchSize 또는 sendRequestFrequency 값에 도달하면 요청을 배치로 보냅니다.
다음 예제에서는 다른 요청 바로 다음에 오는 sendMessage 요청을 보여줍니다. 이 경우 SDK는 두 메시지를 단일 배치로 전송합니다.
// Sending the first message CompletableFuture<SendMessageResponse> futureOne = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("One").queueUrl("queue")); // Sending the second message CompletableFuture<SendMessageResponse> futureTwo = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("Two").queueUrl("queue")); // Waiting for both futures to complete and retrieving the responses SendMessageResponse messageOne = futureOne.join(); SendMessageResponse messageTwo = futureTwo.join();
메시지 표시 제한 시간 변경
SqsAsyncBatchManager#changeMessageVisibilitymaxBatchSize 또는 sendRequestFrequency 값에 도달하면 요청을 배치로 보냅니다.
다음 예제에는 changeMessageVisibility 메서드를 호출하는 방법이 나와 있습니다.
CompletableFuture<ChangeMessageVisibilityResponse> futureOne = sqsAsyncBatchManager.changeMessageVisibility(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); ChangeMessageVisibilityResponse response = futureOne.join();
메시지 삭제
SqsAsyncBatchManager#deleteMessagemaxBatchSize 또는 sendRequestFrequency 값에 도달하면 요청을 배치로 보냅니다.
다음 예제에서는 deleteMessage 메서드를 호출하는 방법을 보여줍니다.
CompletableFuture<DeleteMessageResponse> futureOne = sqsAsyncBatchManager.deleteMessage(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); DeleteMessageResponse response = futureOne.join();
메시지 수신
기본 설정 사용
애플리케이션에서 SqsAsyncBatchManager#receiveMessage 메서드를 폴링하면 배치 관리자가 내부 버퍼에서 메시지를 가져와 SDK가 백그라운드에서 자동으로 업데이트됩니다.
다음 예제에는 receiveMessage 메서드를 호출하는 방법이 나와 있습니다.
CompletableFuture<ReceiveMessageResponse> responseFuture = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl"));
사용자 지정 설정 사용
예를 들어 사용자 지정 대기 시간을 설정하고 검색할 메시지 수를 지정하여 요청을 추가로 사용자 지정하려면 다음 예제와 같이 요청을 사용자 지정할 수 있습니다.
CompletableFuture<ReceiveMessageResponse> response = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl") .waitTimeSeconds(5) .visibilityTimeout(20));
참고
다음 파라미터가 포함된 ReceiveMessageRequest를 사용하여 receiveMessage를 호출하면 SDK는 배치 관리자를 우회하고 일반 비동기식 receiveMessage 요청을 보냅니다.
-
messageAttributeNames -
messageSystemAttributeNames -
messageSystemAttributeNamesWithStrings -
overrideConfiguration
SqsAsyncBatchManager에 대한 구성 설정 재정의
SqsAsyncBatchManager 인스턴스를 만들 때 다음 설정을 조정할 수 있습니다. 다음 설정 목록을 BatchOverrideConfiguration.Builder
| 설정 | 설명 | 기본값 |
|---|---|---|
maxBatchSize |
각 SendMessageBatchRequest, ChangeMessageVisibilityBatchRequest 또는 DeleteMessageBatchRequest에 대한 배치당 최대 요청 수입니다. 최대값은 10입니다. |
10 |
sendRequestFrequency |
배치를 보내기 전까지의 시간이며, |
200ms |
receiveMessageVisibilityTimeout |
메시지에 대한 표시 제한 시간입니다. 설정되지 않으면 대기열의 기본값이 사용됩니다. | 대기열의 기본값 |
receiveMessageMinWaitDuration |
receiveMessage 요청의 최소 대기 시간입니다. CPU 낭비를 방지하기 위해 0으로 설정하는 것은 피합니다. |
50ms |
receiveMessageSystemAttributeNames |
receiveMessage 호출을 요청할 시스템 속성 이름 |
없음 |
receiveMessageAttributeNames |
receiveMessage 호출을 요청할 속성 이름 목록입니다. |
없음 |