AWS SDK for Java 2.x를 사용한 비동기식 프로그래밍 작업
AWS SDK for Java 2.x는 일부 스레드에 높은 동시성을 구현하는 비차단형 입출력 지원이 포함된 비동기식 클라이언트가 특징입니다. 그러나 전체 비차단 입출력은 보장되지 않습니다. 비동기식 클라이언트는 자격 증명 검색, AWS Signature Version 4(SigV4)를 사용한 요청 서명 또는 엔드포인트 검색과 같은 경우에 차단 호출을 수행할 수 있습니다.
비동기 메서드는 클라이언트가 서비스로부터 응답을 받을 때까지 스레드의 실행을 차단합니다. 비동기 메서드는 (값을) 즉시 반환하며, 응답을 기다리지 않고 제어 권한을 호출 스레드에 넘겨줍니다.
비동기 메서드는 응답이 제공되기 전에 (값을) 반환하므로 준비되었을 때 응답을 가져올 방법이 필요합니다. AWS SDK for Java의 2.x에서 비동기식 클라이언트 메서드는 준비가 되었을 때 응답에 액세스하도록 허용하는 CompletableFuture 객체를 반환합니다.
비동기식 클라이언트 API 사용
비동기식 클라이언트 메서드의 서명은 동기식 클라이언트 메서드와 동일하지만 비동기식 메서드는 향후 비동기식 작업의 결과가 포함된 CompletableFutureCompletionException으로 발생합니다.
결과를 얻는 데 사용할 수 있는 한 가지 접근 방식은 SDK 메서드 호출에서 반환된 whenComplete()에 CompletableFuture 메서드를 연결하는 것입니다. whenComplete() 메서드는 비동기식 호출이 완료된 방식에 따라 결과 또는 CompletionException 유형의 Throwable 객체를 수신합니다. 호출 코드로 반환되기 전에 결과를 처리하거나 확인하는 작업을 whenComplete()에 제공합니다.
SDK 메서드에서 반환한 객체 이외의 다른 것을 반환하려면 handle() 메서드를 대신 사용합니다. handle() 메서드는 whenComplete()와 동일한 파라미터를 사용하지만 결과를 처리하고 객체를 반환할 수 있습니다.
비동기식 체인이 완료되고 완료 결과를 검색할 때까지 기다리기 위해 join() 메서드를 호출할 수 있습니다. Throwable 객체가 체인에서 처리되지 않은 경우 join() 메서드는 원래 예외를 래핑하는 선택되지 않은CompletionException을 발생시킵니다. CompletionException#getCause()를 사용하여 원래 예외에 액세스합니다. 또한 CompletableFuture#get() 메서드를 호출하여 완료 결과를 가져올 수 있습니다. 그러나 get() 메서드는 확인된 예외를 발생시킬 수 있습니다.
다음 예제에서는 DynamoDB 비동기식 클라이언트의 listTables() 메서드를 사용하는 방법에 대한 2가지 변형을 보여줍니다. whenComplete()에 전달된 작업은 성공적인 응답을 로깅하는 반면, handle() 버전은 테이블 이름 목록을 추출하고 목록을 반환합니다. 두 경우 모두 비동기식 체인에서 오류가 생성되는 경우 클라이언트 코드가 오류를 처리할 수 있도록 오류를 다시 발생시킵니다.
가져옵니다.
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import java.util.List; import java.util.concurrent.CompletableFuture;
코드
비동기식 메서드에서 스트리밍 처리
스트리밍 콘텐츠에서 비동기식 메서드를 사용할 경우 증분 방식으로 콘텐츠를 제공하기 위해 AsyncRequestBody
다음 예제는 PutObject 작업의 비동기식 양식을 사용하여 파일을 Amazon S3에 비동기식으로 업로드합니다.
가져옵니다.
import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;
코드
/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncOps <bucketName> <key> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " key - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String key = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); PutObjectRequest objectRequest = PutObjectRequest.builder() .bucket(bucketName) .key(key) .build(); // Put the object into the bucket CompletableFuture<PutObjectResponse> future = client.putObject(objectRequest, AsyncRequestBody.fromFile(Paths.get(path)) ); future.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object uploaded. Details: " + resp); } else { // Handle error err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); future.join(); } }
다음 예제는 Amazon S3에서 GetObject 작업의 비동기식 양식을 사용해 파일을 가져옵니다.
가져옵니다.
import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;
코드
/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncStreamOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncStreamOps <bucketName> <objectKey> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " objectKey - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String objectKey = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); GetObjectRequest objectRequest = GetObjectRequest.builder() .bucket(bucketName) .key(objectKey) .build(); CompletableFuture<GetObjectResponse> futureGet = client.getObject(objectRequest, AsyncResponseTransformer.toFile(Paths.get(path))); futureGet.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object downloaded. Details: "+resp); } else { err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); futureGet.join(); } }
고급 비동기 옵션 구성
AWS SDK for Java 2.x는 비동기식 이벤트 중심 네트워크 애플리케이션 프레임워크인 NettyExecutorService를 생성하여 HTTP 클라이언트 요청에서 Netty 클라이언트로 반환된 선물을 완료합니다. 이러한 추상화는 개발자가 스레드 중지 또는 대기를 선택하는 경우 애플리케이션이 비동기 프로세스를 차단하는 위험을 줄여줍니다. 기본적으로 각 비동기 클라이언트는 프로세서 수에 따라 스레드 풀을 만들고 ExecutorService 내에서 대기열의 작업을 관리합니다.
비동기식 클라이언트를 구축할 경우 ExecutorService의 특정 JDK 구현을 지정할 수 있습니다. 다음 코드 조각은 고정된 수의 스레드로 ExecutorService를 만듭니다.
코드
S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, Executors.newFixedThreadPool(10) ) ) .build();
성능을 최적화하려면 자체 스레드 풀 실행 도구를 관리하고 클라이언트를 구성할 때 이를 포함시킬 수 있습니다.
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(<custom_value>), new ThreadFactoryBuilder() .threadNamePrefix("sdk-async-response").build()); // Allow idle core threads to time out executor.allowCoreThreadTimeOut(true); S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, executor ) ) .build();