버전 1에서 버전 2로 자동 Amazon SQS 요청 배치 처리 변경 사항 - AWS SDK for Java 2.x

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

버전 1에서 버전 2로 자동 Amazon SQS 요청 배치 처리 변경 사항

이 주제에서는 AWS SDK for Java의 버전 1과 버전 2 간에 Amazon SQS에 대한 자동 요청 배치 처리의 변경 사항을 자세히 설명합니다.

높은 수준의 변경 사항

AWS SDK for Java 1.x는 요청 배치 처리를 위해 명시적으로 초기화해야 하는 별도의 AmazonSQSBufferedAsyncClient 클래스를 사용하여 클라이언트 측 버퍼링을 수행합니다.

AWS SDK for Java 2.x는 SqsAsyncBatchManager를 사용하여 버퍼링 기능을 간소화하고 개선합니다. 이 인터페이스의 구현은 표준 SqsAsyncClient와 직접 통합된 자동 요청 배치 처리 기능을 제공합니다. v2의 SqsAsyncBatchManager에 대해 알아보려면 이 안내서의 AWS SDK for Java 2.x를 통해 Amazon SQS에서 자동 요청 배치 처리 사용 주제를 참조하세요.

변경 사항 v1 v2

Maven 종속성

<dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-bom</artifactId> <version>1.12.7821</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> </dependency> </dependencies>
<dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.31.152</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> </dependencies>
패키지 이름 com.amazonaws.services.sqs.buffered software.amazon.awssdk.services.sqs.batchmanager
클래스 이름

AmazonSQSBufferedAsyncClient

SqsAsyncBatchManager

1 최신 버전. 2 최신 버전.

자동 SQS 요청 배치 처리 사용

변경 사항 v1 v2
배치 관리자 만들기
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();
사용자 지정 구성을 사용하여 배치 관리자 만들기
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); QueueBufferConfig queueBufferConfig = new QueueBufferConfig() .withMaxBatchOpenMs(200) .withMaxBatchSize(10) .withMinReceiveWaitTimeMs(1000) .withVisibilityTimeoutSeconds(20) .withReceiveMessageAttributeNames(messageAttributeValues); AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, queueBufferConfig);
BatchOverrideConfiguration batchOverrideConfiguration = BatchOverrideConfiguration.builder() .sendRequestFrequency(Duration.ofMillis(200)) .maxBatchSize(10) .receiveMessageMinWaitDuration(Duration.ofMillis(1000)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) .receiveMessageSystemAttributeNames(messageSystemAttributeNames) .receiveMessageAttributeNames(messageAttributeValues) .build(); SqsAsyncBatchManager sqsAsyncBatchManager = SqsAsyncBatchManager.builder() .overrideConfiguration(batchOverrideConfiguration) .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(8)) .build();
메시지 전송
Future<SendMessageResult> sendResultFuture = bufferedSqs.sendMessageAsync(new SendMessageRequest() .withQueueUrl(queueUrl) .withMessageBody(body));
CompletableFuture<SendMessageResponse> sendCompletableFuture = sqsAsyncBatchManager.sendMessage( SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(body) .build());
메시지 삭제
Future<DeleteMessageResult> deletResultFuture = bufferedSqs.deleteMessageAsync(new DeleteMessageRequest() .withQueueUrl(queueUrl));
CompletableFuture<DeleteMessageResponse> deleteResultCompletableFuture = sqsAsyncBatchManager.deleteMessage( DeleteMessageRequest.builder() .queueUrl(queueUrl) .build());
메시지의 표시 여부 변경
Future<ChangeMessageVisibilityResult> changeVisibilityResultFuture = bufferedSqs.changeMessageVisibilityAsync (new ChangeMessageVisibilityRequest() .withQueueUrl(queueUrl) .withVisibilityTimeout(20));
CompletableFuture<ChangeMessageVisibilityResponse> changeResponseCompletableFuture = sqsAsyncBatchManager.changeMessageVisibility( ChangeMessageVisibilityRequest.builder() .queueUrl(queueUrl) .visibilityTimeout(20) .build());
메시지 수신
ReceiveMessageResult receiveResult = bufferedSqs.receiveMessage( new ReceiveMessageRequest() .withQueueUrl(queueUrl));
CompletableFuture<ReceiveMessageResponse> responseCompletableFuture = sqsAsyncBatchManager.receiveMessage( ReceiveMessageRequest.builder() .queueUrl(queueUrl) .build());

비동기식 반환 유형 차이점

변경 사항 v1 v2
반환 타입 Future<ResultType> CompletableFuture<ResponseType>
콜백 메커니즘 별도의 onSuccessonError 메서드가 있는 AsyncHandler 필요 whenComplete(), thenCompose(), thenApply() 등 JDK에서 제공하는 CompletableFuture API 사용
예외 처리 AsyncHandler#onError() 메서드 사용 exceptionally(), handle(), whenComplete() 등 JDK에서 제공하는 CompletableFuture API 사용
취소 Future.cancel()을 통한 기본 지원 상위 CompletableFuture를 취소하면 체인의 모든 종속 future 자동 취소

비동기식 완료 처리 차이점

변경 사항 v1 v2
응답 핸들러 구현
Future<ReceiveMessageResult> future = bufferedSqs.receiveMessageAsync( receiveRequest, new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() { @Override public void onSuccess(ReceiveMessageRequest request, ReceiveMessageResult result) { List<Message> messages = result.getMessages(); System.out.println("Received " + messages.size() + " messages"); for (Message message : messages) { System.out.println("Message ID: " + message.getMessageId()); System.out.println("Body: " + message.getBody()); } } @Override public void onError(Exception e) { System.err.println("Error receiving messages: " + e.getMessage()); e.printStackTrace(); } } );
CompletableFuture<ReceiveMessageResponse> completableFuture = sqsAsyncBatchManager .receiveMessage(ReceiveMessageRequest.builder() .queueUrl(queueUrl).build()) .whenComplete((receiveMessageResponse, throwable) -> { if (throwable != null) { System.err.println("Error receiving messages: " + throwable.getMessage()); throwable.printStackTrace(); } else { List<Message> messages = receiveMessageResponse.messages(); System.out.println("Received " + messages.size() + " messages"); for (Message message : messages) { System.out.println("Message ID: " + message.messageId()); System.out.println("Body: " + message.body()); } } });

주요 구성 파라미터

파라미터 v1 v2
최대 배치 크기 maxBatchSize(배치당 기본 요청 10개) maxBatchSize(배치당 기본 요청 10개)
배치 대기 시간 maxBatchOpenMs(기본값 200ms) sendRequestFrequency(기본값 200ms)
표시 제한 시간 visibilityTimeoutSeconds(대기열 기본값의 경우 -1) receiveMessageVisibilityTimeout(기본 대기열)
최소 대기 시간 longPollWaitTimeoutSeconds(longPoll이 true인 경우 20초) receiveMessageMinWaitDuration(기본값 50ms)
메시지 속성 ReceiveMessageRequest를 사용하여 설정 receiveMessageAttributeNames(기본적으로 없음)
시스템 속성 ReceiveMessageRequest를 사용하여 설정 receiveMessageSystemAttributeNames(기본적으로 없음)
긴 폴링 longPoll(기본값 true) 서버가 메시지를 보낼 때까지 대기 중인 열린 연결을 방지하기 위해 지원되지 않음
긴 폴링의 최대 대기 시간 longPollWaitTimeoutSeconds(기본값 20초) 서버가 메시지를 보낼 때까지 대기 중인 열린 연결을 방지하기 위해 지원되지 않음
클라이언트 측에서 미리 가져와서 저장하는 수신 배치의 최대 개수 maxDoneReceiveBatches(배치 10개) 내부적으로 처리되므로 지원되지 않음
동시에 처리되는 최대 활성 아웃바운드 배치 수 maxInflightOutboundBatches(기본값 배치 5개) 내부적으로 처리되므로 지원되지 않음
동시에 처리되는 최대 활성 수신 배치 수 maxInflightReceiveBatches(기본값 배치 10개) 내부적으로 처리되므로 지원되지 않음