将 Transfer Manager 从适用于 Java 的 AWS SDK 的版本 1 迁移到版本 2。
本迁移指南介绍了 Transfer Manager v1 和 S3 Transfer Manager v2 之间的主要差异,包括构造函数更改、方法映射和常见操作的代码示例。在了解这些差异之后,您可以成功迁移现有的 Transfer Manager 代码,以便充分利用 v2 中改进的性能和异步操作。
关于 AWS SDK 迁移工具
适用于 Java 的 AWS SDK 提供了一个自动迁移工具,可以将大部分 v1 Transfer Manager API 迁移到 v2。但是,该迁移工具不支持几项 v1 Transfer Manager 功能。对于这些情况,您需要按照本主题中的指导手动迁移 Transfer Manager 代码。
在本指南中,迁移状态显示了迁移工具是否可以自动迁移构造函数、方法或功能:
-
支持:迁移工具可以自动转换此代码
-
不支持:您需要手动迁移代码
即使对于标记为“支持”的项目,也要检查迁移结果并进行彻底测试。Transfer Manager 迁移涉及从同步操作到异步操作的重大架构更改。
概览
S3 Transfer Manager v2 对 Transfer Manager API 进行了重大更改。S3 Transfer Manager v2 基于异步操作构建,可提供更好的性能,尤其是在使用基于 AWS CRT 的 Amazon S3 客户端时。
主要区别
-
软件包:
com.amazonaws.services.s3.transfer→software.amazon.awssdk.transfer.s3 -
类名:
TransferManager→S3TransferManager -
客户端依赖项:同步 Amazon S3 客户端 → 异步 Amazon S3 客户端(
S3AsyncClient) -
架构:同步操作 → 使用
CompletableFuture的异步操作 -
性能:通过基于 AWS CRT 的客户端支持得到增强
高级别更改
| 方面 | V1 | V2 |
|---|---|---|
| Maven 依赖项 | aws-java-sdk-s3 |
s3-transfer-manager |
| 软件包 | com.amazonaws.services.s3.transfer |
software.amazon.awssdk.transfer.s3 |
| 主要的类 | TransferManager |
S3TransferManager |
| Amazon S3 客户端 | AmazonS3(同步) |
S3AsyncClient(异步) |
| 返回类型 | 阻塞操作 | CompletableFuture<T> |
Maven 依赖项
| V1 | V2 |
|---|---|
|
|
客户端构造函数迁移
支持的构造函数(自动迁移)
| V1 构造函数 | V2 等效项 | 迁移状态 |
|---|---|---|
new TransferManager() |
S3TransferManager.create() |
支持 |
TransferManagerBuilder.
defaultTransferManager() |
S3TransferManager.create() |
支持 |
TransferManagerBuilder.
standard().build() |
S3TransferManager.builder().build() |
支持 |
new TransferManager(AWSCredentials) |
S3TransferManager.builder()
.s3Client(S3AsyncClient.builder()
.credentialsProvider(...).build())
.build() |
支持 |
new TransferManager(
AWSCredentialsProvider) |
S3TransferManager.builder()
.s3Client(S3AsyncClient.builder()
.credentialsProvider(...).build())
.build() |
支持 |
不支持的构造函数(需要手动迁移)
| V1 构造函数 | V2 等效项 | 迁移说明 |
|---|---|---|
new TransferManager(AmazonS3) |
需要手动迁移 | 单独创建 S3AsyncClient |
new TransferManager(AmazonS3,
ExecutorService) |
需要手动迁移 | 创建 S3AsyncClient 并配置执行器 |
new TransferManager(AmazonS3,
ExecutorService, boolean) |
需要手动迁移 | 不支持 shutDownThreadPools 参数 |
手动迁移示例
V1 代码:
AmazonS3 s3Client = AmazonS3ClientBuilder.defaultClient();
TransferManager transferManager = new TransferManager(s3Client);
V2 代码:
// Create an `S3AsyncClient` with similar configuration
S3AsyncClient s3AsyncClient = S3AsyncClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.build();
// Provide the configured `S3AsyncClient` to the S3 transfer manager builder.
S3TransferManager transferManager = S3TransferManager.builder()
.s3Client(s3AsyncClient)
.build();
客户端方法迁移
目前,迁移工具支持基本 copy、download、upload、uploadDirectory、downloadDirectory、resumeDownload 和 resumeUpload 方法。
核心传输方法
| V1 方法 | V2 方法 | 返回类型更改 | 迁移状态 |
|---|---|---|---|
upload(String, String, File) |
uploadFile(UploadFileRequest) |
Upload → FileUpload |
支持 |
upload(PutObjectRequest) |
upload(UploadRequest) |
Upload → Upload |
支持 |
download(String, String, File) |
downloadFile(DownloadFileRequest) |
Download → FileDownload |
支持 |
download(GetObjectRequest, File) |
downloadFile(DownloadFileRequest) |
Download → FileDownload |
支持 |
copy(String, String, String, String) |
copy(CopyRequest) |
Copy → Copy |
支持 |
copy(CopyObjectRequest) |
copy(CopyRequest) |
Copy → Copy |
支持 |
uploadDirectory(String, String,
File, boolean) |
uploadDirectory(
UploadDirectoryRequest) |
MultipleFileUpload →
DirectoryUpload |
支持 |
downloadDirectory(String, String, File) |
downloadDirectory(
DownloadDirectoryRequest) |
MultipleFileDownload →
DirectoryDownload |
支持 |
可恢复的传输方法
| V1 方法 | V2 方法 | 迁移状态 |
|---|---|---|
resumeUpload(PersistableUpload) |
resumeUploadFile(ResumableFileUpload) |
支持 |
resumeDownload(PersistableDownload) |
resumeDownloadFile(ResumableFileDownload) |
支持 |
生命周期方法
| V1 方法 | V2 方法 | 迁移状态 |
|---|---|---|
shutdownNow() |
close() |
支持 |
shutdownNow(boolean) |
使用 close() 方法手动调整代码 |
不支持 |
不支持的 V1 客户端方法
| V1 方法 | V2 替代方案 | 备注 |
|---|---|---|
abortMultipartUploads(String, Date) |
使用低级别 Amazon S3 客户端 | 不支持 |
getAmazonS3Client() |
单独保存引用 | 不支持;v2 中没有 getter |
getConfiguration() |
单独保存引用 | 不支持;v2 中没有 getter |
uploadFileList(...) |
多次调用 uploadFile() |
不支持 |
使用 TransferStateChangeListener 参数的 copy 方法 |
使用 TransferListener |
参见手动迁移示例 |
使用 S3ProgressListener 参数的 download 方法 |
使用 TransferListener |
参见手动迁移示例 |
|
使用 4 个或更多参数的 |
参见手动迁移示例 | |
使用 ObjectMetadataProvider 参数的 upload 方法 |
在请求中设置元数据 | 参见手动迁移示例 |
使用 *Provider 参数的 uploadDirectory 方法 |
在请求中设置标签 | 参见手动迁移示例 |
使用 TransferStateChangeListener 参数的 copy 方法
-
copy(CopyObjectRequest copyObjectRequest, AmazonS3 srcS3, TransferStateChangeListener stateChangeListener) -
copy(CopyObjectRequest copyObjectRequest, TransferStateChangeListener stateChangeListener)
// V1 ---------------------------------------------------------------------------------------------- // Initialize source S3 client AmazonS3 s3client = AmazonS3ClientBuilder.standard() .withRegion("us-west-2") .build(); // Initialize Transfer Manager TransferManager tm = TransferManagerBuilder.standard() .withS3Client(srcS3) .build(); CopyObjectRequest copyObjectRequest = new CopyObjectRequest( "amzn-s3-demo-source-bucket", "source-key", "amzn-s3-demo-destination-bucket", "destination-key" ); TransferStateChangeListener stateChangeListener = new TransferStateChangeListener() { @Override public void transferStateChanged(Transfer transfer, TransferState state) { //Implementation of the TransferStateChangeListener } }; Copy copy = tm.copy(copyObjectRequest, srcS3, stateChangeListener); // V2 ---------------------------------------------------------------------------------------------- S3AsyncClient s3AsyncClient = S3AsyncClient.builder() .region(Region.US_WEST_2) .build(); S3TransferManager transferManager = S3TransferManager.builder() .s3Client(s3AsyncClient) .build(); // Create transfer listener (equivalent to TransferStateChangeListener in v1) TransferListener transferListener = new TransferListener() { @Override public void transferInitiated(Context.TransferInitiated context) { //Implementation System.out.println("Transfer initiated"); } @Override public void bytesTransferred(Context.BytesTransferred context) { //Implementation System.out.println("Bytes transferred"); } @Override public void transferComplete(Context.TransferComplete context) { //Implementation System.out.println("Transfer completed!"); } @Override public void transferFailed(Context.TransferFailed context) { //Implementation System.out.println("Transfer failed"); } }; CopyRequest copyRequest = CopyRequest.builder() .copyObjectRequest(req -> req .sourceBucket("amzn-s3-demo-source-bucket") .sourceKey("source-key") .destinationBucket("amzn-s3-demo-destination-bucket") .destinationKey("destination-key") ) .addTransferListener(transferListener) // Configure the transferListener into the request .build(); Copy copy = transferManager.copy(copyRequest);
使用 S3ProgressListener 参数的 download 方法
-
download(GetObjectRequest getObjectRequest, File file, S3ProgressListener progressListener) -
download(GetObjectRequest getObjectRequest, File file, S3ProgressListener progressListener, long timeoutMillis) -
download(GetObjectRequest getObjectRequest, File file, S3ProgressListener progressListener, long timeoutMillis, boolean resumeOnRetry)
// V1 ---------------------------------------------------------------------------------------------- S3ProgressListener progressListener = new S3ProgressListener() { @Override public void progressChanged(com.amazonaws.event.ProgressEvent progressEvent) { long bytes = progressEvent.getBytesTransferred(); ProgressEventType eventType = progressEvent.getEventType(); // Use bytes and eventType as needed } @Override public void onPersistableTransfer(PersistableTransfer persistableTransfer) { } }; Download download1 = tm.download(getObjectRequest, file, progressListener); Download download2 = tm.download(getObjectRequest, file, progressListener, timeoutMillis) Download download3 = tm.download(getObjectRequest, file, progressListener, timeoutMillis, true) // V2 ---------------------------------------------------------------------------------------------- TransferListener transferListener = new TransferListener() { @Override public void transferInitiated(Context.InitializedContext context) { // Equivalent to ProgressEventType.TRANSFER_STARTED_EVENT System.out.println("Transfer initiated"); } @Override public void bytesTransferred(Context.BytesTransferred context) { // Equivalent to ProgressEventType.REQUEST_BYTE_TRANSFER_EVENT long bytes = context.bytesTransferred(); System.out.println("Bytes transferred: " + bytes); } @Override public void transferComplete(Context.TransferComplete context) { // Equivalent to ProgressEventType.TRANSFER_COMPLETED_EVENT System.out.println("Transfer completed"); } @Override public void transferFailed(Context.TransferFailed context) { // Equivalent to ProgressEventType.TRANSFER_FAILED_EVENT System.out.println("Transfer failed: " + context.exception().getMessage()); } }; DownloadFileRequest downloadFileRequest = DownloadFileRequest.builder() .getObjectRequest(getObjectRequest) .destination(file.toPath()) .addTransferListener(transferListener) .build(); // For download1 FileDownload download = transferManager.downloadFile(downloadFileRequest); // For download2 CompletedFileDownload completedFileDownload = download.completionFuture() .get(timeoutMillis, TimeUnit.MILLISECONDS); // For download3, the v2 SDK does not have a direct equiavalent to the `resumeOnRetry` method of v1. // If a download is interrupted, you need to start a new download request.
使用 4 个或更多参数的 downloadDirectory 方法
-
downloadDirectory(String bucketName, String keyPrefix, File destinationDirectory, boolean resumeOnRetry) -
downloadDirectory(String bucketName, String keyPrefix, File destinationDirectory, boolean resumeOnRetry, KeyFilter filter) -
downloadDirectory(String bucketName, String keyPrefix, File destinationDirectory, KeyFilter filter)
// V1 ---------------------------------------------------------------------------------------------- KeyFilter filter = new KeyFilter() { @Override public boolean shouldInclude(S3ObjectSummary objectSummary) { //Filter implementation } }; MultipleFileDownload multipleFileDownload = tm.downloadDirectory(bucketName, keyPrefix, destinationDirectory, filter); // V2 ---------------------------------------------------------------------------------------------- // The v2 SDK does not have a direct equiavalent to the `resumeOnRetry` method of v1. // If a download is interrupted, you need to start a new download request. DownloadFilter filter = new DownloadFilter() { @Override public boolean test(S3Object s3Object) { // Filter implementation. } }; DownloadDirectoryRequest downloadDirectoryRequest = DownloadDirectoryRequest.builder() .bucket(bucketName) .filter(filter) .listObjectsV2RequestTransformer(builder -> builder.prefix(keyPrefix)) .destination(destinationDirectory.toPath()) .build(); DirectoryDownload directoryDownload = transferManager.downloadDirectory(downloadDirectoryRequest);
使用 ObjectMetadata 参数的 upload 方法
-
upload(String bucketName, String key, InputStream input, ObjectMetadata objectMetadata)
// V1 ----------------------------------------------------------------------------------------------ObjectMetadata metadata = new ObjectMetadata(); ObjectMetadata metadata = new ObjectMetadata(); metadata.setContentType("text/plain"); // System-defined metadata metadata.setContentLength(22L); // System-defined metadata metadata.addUserMetadata("myKey", "myValue"); // User-defined metadata PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, key, inputStream, metadata); Upload upload = transferManager.upload("amzn-s3-demo-bucket", "my-key", inputStream, metadata); // V2 ---------------------------------------------------------------------------------------------- /* When you use an InputStream to upload in V2, you should specify the content length and use `RequestBody.fromInputStream()`. If you don't provide the content length, the entire stream will be buffered in memory. If you can't determine the content length, we recommend using the CRT-based S3 client. */ Map<String, String> userMetadata = new HashMap<>(); userMetadata.put("x-amz-meta-myKey", "myValue"); PutObjectRequest putObjectRequest = PutObjectRequest.builder() .bucket("amzn-s3-demo-bucket1") .key("k") .contentType("text/plain") //System-defined metadata usually has separate methods in the builder. .contentLength(22L) .metadata(userMetadata) //metadata() is only for user-defined metadata. .build(); UploadRequest uploadRequest = UploadRequest.builder() .putObjectRequest(putObjectRequest) .requestBody(AsyncRequestBody.fromInputStream(stream, 22L, executor)) .build(); transferManager.upload(uploadRequest).completionFuture().join();
使用 ObjectMetadataProvider 参数的 uploadDirectory
-
uploadDirectory(String bucketName, String virtualDirectoryKeyPrefix, File directory, boolean includeSubdirectories, ObjectMetadataProvider metadataProvider) -
uploadDirectory(String bucketName, String virtualDirectoryKeyPrefix, File directory, boolean includeSubdirectories, ObjectMetadataProvider metadataProvider, ObjectTaggingProvider taggingProvider) -
uploadDirectory(String bucketName, String virtualDirectoryKeyPrefix, File directory, boolean includeSubdirectories, ObjectMetadataProvider metadataProvider, ObjectTaggingProvider taggingProvider, ObjectCannedAclProvider cannedAclProvider)
// V1 ---------------------------------------------------------------------------------------------- tm.uploadDirectory(bucketName, virtualDirectoryKeyPrefix, directory, includeSubdirectories, metadataProvider) tm.uploadDirectory(bucketName, virtualDirectoryKeyPrefix, directory, includeSubdirectories, metadataProvider, taggingProvider) tm.uploadDirectory(bucketName, virtualDirectoryKeyPrefix, directory, includeSubdirectories, metadataProvider, taggingProvider, cannedAclProvider) // V2 ---------------------------------------------------------------------------------------------- UploadDirectoryRequest request = UploadDirectoryRequest.builder() .bucket(bucketName) .s3Prefix(virtualDirectoryKeyPrefix) .source(directory.toPath()) .maxDepth(includeSubdirectories ? Integer.MAX_VALUE : 1) .uploadFileRequestTransformer(builder -> { // 1.Replace `ObjectMetadataProvider`, `ObjectTaggingProvider`, and `ObjectCannedAclProvider` with an // `UploadFileRequestTransformer` that can combine the functionality of all three *Provider implementations. // 2. Convert your v1 `ObjectMetadata` to v2 `PutObjectRequest` parameters. // 3. Convert your v1 `ObjectTagging` to v2 `Tagging`. // 4. Convert your v1 `CannedAccessControlList` to v2 `ObjectCannedACL`. }) .build(); DirectoryUpload directoryUpload = transferManager.uploadDirectory(request);
模型对象迁移
在 AWS SDK for Java 2.x 中,许多 TransferManager 模型对象经过重新设计,不再支持 v1 模型对象中可用的几个 getter 和 setter 方法。
在 v2 中,您可以使用 CompletableFuture<T> 类在传输完成时执行操作,无论传输是成功还是出现异常。如果需要,您可以使用 join() 方法来等待完成。
核心传输对象
| V1 类 | V2 类 | 迁移状态 |
|---|---|---|
TransferManager |
S3TransferManager |
支持 |
TransferManagerBuilder |
S3TransferManager.Builder |
支持 |
Transfer |
Transfer |
支持 |
AbortableTransfer |
Transfer |
支持(没有单独的类) |
Copy |
Copy |
支持 |
Download |
FileDownload |
支持 |
Upload |
Upload / FileUpload |
支持 |
MultipleFileDownload |
DirectoryDownload |
支持 |
MultipleFileUpload |
DirectoryUpload |
支持 |
持久化对象
| V1 类 | V2 类 | 迁移状态 |
|---|---|---|
PersistableDownload |
ResumableFileDownload |
支持 |
PersistableUpload |
ResumableFileUpload |
支持 |
PersistableTransfer |
ResumableTransfer |
支持 |
PauseResult<T> |
直接可恢复对象 | 不支持 |
结果对象
| V1 类 | V2 类 | 迁移状态 |
|---|---|---|
CopyResult |
CompletedCopy |
支持 |
UploadResult |
CompletedUpload |
支持 |
配置对象
| V1 类 | V2 类 | 迁移状态 |
|---|---|---|
TransferManagerConfiguration |
MultipartConfiguration(在 Amazon S3 客户端上) |
支持 |
TransferProgress |
TransferProgress + TransferProgressSnapshot |
支持 |
KeyFilter |
DownloadFilter |
支持 |
不支持的对象
| V1 类 | V2 替代方案 | 迁移状态 |
|---|---|---|
PauseStatus |
不支持 | 不支持 |
UploadContext |
不支持 | 不支持 |
ObjectCannedAclProvider |
PutObjectRequest.builder().acl() |
不支持 |
ObjectMetadataProvider |
PutObjectRequest.builder().metadata() |
不支持 |
ObjectTaggingProvider |
PutObjectRequest.builder().tagging() |
不支持 |
PresignedUrlDownload |
不支持 | 不支持 |
TransferManagerBuilder 配置迁移
配置更改
您需要为 v2 传输管理器设置的配置更改取决于您使用的 S3 客户端。您可以选择基于 AWS CRT 的 S3 客户端或基于 Java 的标准 S3 异步客户端。有关差异的信息,请参阅 AWS SDK for Java 2.x 中的 S3 客户端 主题。
行为更改
异步操作
V1(阻塞):
Upload upload = transferManager.upload("amzn-s3-demo-bucket", "key", file);
upload.waitForCompletion(); // Blocks until complete
V2(异步):
FileUpload upload = transferManager.uploadFile(UploadFileRequest.builder()
.putObjectRequest(PutObjectRequest.builder()
.bucket("amzn-s3-demo-bucket")
.key("key")
.build())
.source(file)
.build());
CompletedFileUpload result = upload.completionFuture().join(); // Blocks until complete
// Or handle asynchronously:
upload.completionFuture().thenAccept(result -> {
System.out.println("Upload completed: " + result.response().eTag());
});
错误处理
V1:如果任何子请求失败,则目录传输将完全失败。
V2:即使某些子请求失败,目录传输也会成功完成。明确检查错误:
DirectoryUpload directoryUpload = transferManager.uploadDirectory(request);
CompletedDirectoryUpload result = directoryUpload.completionFuture().join();
// Check for failed transfers
if (!result.failedTransfers().isEmpty()) {
System.out.println("Some uploads failed:");
result.failedTransfers().forEach(failed ->
System.out.println("Failed: " + failed.exception().getMessage()));
}
通过字节范围提取进行并行下载
在 v2 SDK 中启用自动并行传输功能后,S3 Transfer Manager 使用字节范围提取来并行检索对象的特定部分(分段下载)。使用 v2 下载对象的方式不取决于对象最初的上传方式。所有下载都可以从高吞吐量和并发性中受益。
相比之下,使用 v1 Transfer Manager 时,对象最初的上传方式确实很重要。v1 Transfer Manager 检索对象各个分段的方式与上传分段的方式相同。如果对象最初是作为单个对象上传的,则 v1 Transfer Manager 无法通过使用子请求来加速下载过程。