使用AWS SDK for Java 2.x 异步编程 - AWS SDK for Java 2.x

使用AWS SDK for Java 2.x 异步编程

AWS SDK for Java 2.x 提供了异步客户端,这些客户端支持非阻塞 I/O,可以在几个线程中实现高并发。然而,并不能保证完全实现非阻塞 I/O。异步客户端在某些情况下仍可能执行阻塞调用,这些情况包括凭证检索、使用AWS签名版本 4(SigV4)请求签名或端点发现。

同步方法会阻止执行您的线程,直到客户端接收到服务的响应。异步方法会立即返回,并控制调用的线程,而不必等待响应。

由于异步方法在收到响应之前返回,所以需要通过某种方法在响应准备就绪时接收响应。适用于 Java 的 AWS SDK 2.x 中的异步客户端方法将返回 CompletableFuture 对象,这些对象可让您在响应准备就绪时访问响应。

使用异步客户端 API

异步客户端方法的签名与其对应同步方法的签名相同,但是异步方法返回 CompletableFuture 对象,该对象在未来会包含异步操作的结果。如果在 SDK 的异步方法执行时发生错误,则错误会作为 CompletionException 引发。

您可以用来获取结果的一种方法是,将一个 whenComplete() 方法链接到 SDK 方法调用所返回的 CompletableFuturewhenComplete() 方法接收结果或 Throwable 类型的对象 CompletionException,具体取决于完成异步调用的方式。您向 whenComplete() 提供一个操作,用于在结果返回给调用代码之前,对结果进行处理或检查。

如果要返回的不是 SDK 方法原本返回的对象,而是其他内容,请改用 handle() 方法。handle() 方法采用的参数与 whenComplete() 相同,但您可以处理结果并返回对象。

要等待异步链完成并检索完成结果,可以调用 join() 方法。如果未在链中处理 Throwable 对象,则 join() 方法会抛出封装了原始异常的 CompletionException。您可以使用 CompletionException#getCause() 访问原始异常。您也可以调用 CompletableFuture#get() 方法来获取完成结果。但 get() 方法会抛出受检异常。

以下示例显示了两种变体,说明如何使用 DynamoDB 异步客户端的 listTables() 方法。传递给 whenComplete() 的操作仅记录成功的响应,而 handle() 版本则提取表名列表并返回列表。在这两种情况下,如果异步链中生成错误,则会重新抛出错误,以便客户端代码有机会对其进行处理。

导入

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import java.util.List; import java.util.concurrent.CompletableFuture;

代码

whenComplete() variation
public class DynamoDbAsyncListTables { public static void main(String[] args) { Region region = Region.US_EAST_1; DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); try { ListTablesResponse listTablesResponse = listTablesWhenComplete(dynamoDbAsyncClient).join(); // The join() method may throw a CompletionException. if (listTablesResponse.hasTableNames()){ System.out.println("Table exist in this region: " + region.id()); } } catch (RuntimeException e) { // Handle as needed. Here we simply print out the class names. System.out.println(e.getClass()); // Prints 'class java.util.concurrent.CompletionException'. System.out.println(e.getCause().getClass()); // Prints 'class software.amazon.awssdk.services.dynamodb.model.DynamoDbException'. } } public static CompletableFuture<ListTablesResponse> listTablesWhenComplete(DynamoDbAsyncClient client) { return client.listTables(ListTablesRequest.builder().build()) .whenComplete((listTablesResponse, throwable) -> { if (listTablesResponse != null) { // Consume the response. System.out.println("The SDK's listTables method completed successfully."); } else { RuntimeException cause = (RuntimeException) throwable.getCause(); // If an error was thrown during the SDK's listTables method it is wrapped in a CompletionException. // The SDK throws only RuntimeExceptions, so this is a safe cast. System.out.println(cause.getMessage()); // Log error here, but rethrow so the calling code can handle as needed. throw cause; } }); }
handle() variation
public class DynamoDbAsyncListTables { public static void main(String[] args) { Region region = Region.US_EAST_1; DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); try { List<String> tableNames = listTablesHandle(dynamoDbAsyncClient).join(); // The join() method may throw a CompletionException. tableNames.forEach(System.out::println); } catch (RuntimeException e) { // Handle as needed. Here we simply print out the class names. System.out.println(e.getClass()); // Prints 'class java.util.concurrent.CompletionException'. System.out.println(e.getCause().getClass()); // Prints 'class software.amazon.awssdk.services.dynamodb.model.DynamoDbException'. } } public static CompletableFuture<List<String>> listTablesHandle(DynamoDbAsyncClient client) { return client.listTables(ListTablesRequest.builder().build()) .handle((listTablesResponse, throwable) -> { if (listTablesResponse != null) { return listTablesResponse.tableNames(); // Return the list of table names. } else { RuntimeException cause = (RuntimeException) throwable.getCause(); // If an error was thrown during the SDK's listTables method it is wrapped in a CompletionException. // The SDK throws only RuntimeExceptions, so this is a safe cast. System.out.println(cause.getMessage()); // Log error here, but rethrow so the calling code can handle as needed. throw cause; } }); } }

使用异步方法处理流

对于包含流式内容的异步方法,必须提供 AsyncRequestBody,以便以递增方式提供内容,或者提供 AsyncResponseTransformer 以接收和处理响应。

以下示例使用 PutObject 操作的异步形式,以异步方式将文件上传到 Amazon S3。

导入

import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

代码

/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncOps <bucketName> <key> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " key - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String key = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); PutObjectRequest objectRequest = PutObjectRequest.builder() .bucket(bucketName) .key(key) .build(); // Put the object into the bucket CompletableFuture<PutObjectResponse> future = client.putObject(objectRequest, AsyncRequestBody.fromFile(Paths.get(path)) ); future.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object uploaded. Details: " + resp); } else { // Handle error err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); future.join(); } }

以下示例使用 GetObject 操作的异步形式,从 Amazon S3 获取文件。

导入

import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

代码

/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncStreamOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncStreamOps <bucketName> <objectKey> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " objectKey - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String objectKey = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); GetObjectRequest objectRequest = GetObjectRequest.builder() .bucket(bucketName) .key(objectKey) .build(); CompletableFuture<GetObjectResponse> futureGet = client.getObject(objectRequest, AsyncResponseTransformer.toFile(Paths.get(path))); futureGet.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object downloaded. Details: "+resp); } else { err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); futureGet.join(); } }

配置高级异步选项

适用于 Java 的 AWS SDK 2.x 使用 Netty 处理 I/O 线程,这是一种异步的事件驱动网络应用程序框架。适用于 Java 的 AWS SDK 2.x 在 Netty 之后创建 ExecutorService,以完成从 HTTP 客户端请求返回到 Netty 客户端的 future。这种抽象化可以减少在客服人员选择停止或休眠线程时,应用程序中断异步处理的风险。默认情况下,每个异步客户端都会根据处理器数量创建一个线程池,并管理 ExecutorService 队列中的任务。

在构建异步客户端时,可以指定特定的 JDK 实现 ExecutorService。以下代码段创建了一个具有固定线程数的 ExecutorService

代码

S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, Executors.newFixedThreadPool(10) ) ) .build();

要优化性能,您可以管理自己的线程池执行程序,并在配置客户端时包括该执行程序。

ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(<custom_value>), new ThreadFactoryBuilder() .threadNamePrefix("sdk-async-response").build()); // Allow idle core threads to time out executor.allowCoreThreadTimeOut(true); S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, executor ) ) .build();