本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon S3 实用程序
Amazon S3 Transfer Manager
Amazon S3 上传和下载管理器可以将大型对象分成多个分段,以便并行传输。这样可以轻松恢复中断的传输。
Amazon S3 上传管理器
Amazon S3 上传管理器会判断是否可将文件拆分为较小的分段,然后并行上传。您可以自定义并行上传的数量和已上传分段的大小。
以下示例使用 Amazon S3 Uploader 上传一个文件。使用 Uploader 与 s3.PutObject() 操作类似。
import "context" import "github.com/aws/aws-sdk-go-v2/config" import "github.com/aws/aws-sdk-go-v2/service/s3" import "github.com/aws/aws-sdk-go-v2/feature/s3/manager" // ... cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Printf("error: %v", err) return } client := s3.NewFromConfig(cfg) uploader := manager.NewUploader(client) result, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String("amzn-s3-demo-bucket"), Key: aws.String("my-object-key"), Body: uploadFile, })
配置选项
使用实例化Uploader实例时 NewUploaderNewUploader 提供一个或多个参数来覆盖选项。这些选项包括:
-
PartSize– 指定要上传的每个分段的缓冲区大小(以字节为单位)。每个分段最小大小为 5 MiB。 -
Concurrency– 指定要并行上传的分段的数量。 -
LeavePartsOnError– 指示是否将成功上传的分段保留在 Amazon S3 中。
Concurrency 值限制了给定 Upload 调用可能发生的分段上传的并发数量。这不是全局客户端并发限制。调整 PartSize 和 Concurrency 配置值以找到最佳配置。例如,具有高带宽连接的系统可以并行发送更大的分段和进行更多的上传。
例如,您的应用程序通过将 Concurrency 设置为 5,对 Uploader 进行配置。如果应用程序随后从两个不同的 goroutine 中调用 Upload,则结果是 10 个并发分段上传(2 个 goroutine * 5 Concurrency)。
警告
您的应用程序应限制对 Upload 的并发调用次数,以防止应用程序资源耗尽。
以下是在 Uploader 创建期间设置默认分段大小的示例:
uploader := manager.NewUploader(client, func(u *Uploader) { u.PartSize = 10 * 1024 * 1024, // 10 MiB })
有关Uploader其配置的更多信息,请参阅 适用于 Go 的 AWS SDK API 参考中的 Uploader
PutObjectInput Body Field (io. ReadSeeker 与 io.Reader 对比)
s3.PutObjectInput 结构的 Body 字段属于 io.Reader 类型。但是,可以用同时满足 io.ReadSeeker 和 io.ReaderAt 接口的类型填充此字段,以提高主机环境的应用程序资源利用率。以下示例创建了满足这两个接口的类型 ReadSeekerAt:
type ReadSeekerAt interface { io.ReadSeeker io.ReaderAt }
对于 io.Reader 类型,必须先将读取器的字节缓存到内存中,然后才能上传分段。增加 PartSize 或 Concurrency 值时,Uploader 所需的内存(RAM)会显著增加。所需的内存约为 PartSize * Concurrency。例如,为 PartSize 指定 100 MB,为 Concurrency 指定 10 MB,那么至少需要 1 GB。
由于 io.Reader 类型在读取字节之前无法确定其大小,Uploader 无法计算要上传多少分段。因此,如果将 PartSize 值设置得过低,Uploader 可能会达到 Amazon S3 对于大文件的上传限制,即 10000 个分段。如果尝试上传超过 10000 个分段,则上传将停止并返回错误。
对于实现 ReadSeekerAt 类型的 body 值,在将正文内容发送到 Amazon S3 之前,Uploader 不会将其缓冲到内存中。Uploader 在将文件上传到 Amazon S3 之前会计算预期分段数。如果 PartSize 的当前值需要超过 10000 个分段才能上传文件,则 Uploader 会增加分段大小值以减少所需的分段。
处理失败的上传
如果上传到 Amazon S3 失败,默认情况下,Uploader 会使用 Amazon S3 AbortMultipartUpload 操作来删除已上传的分段。此功能可确保失败的上传不会占用 Amazon S3 存储空间。
您可以将 LeavePartsOnError 设置为 true,这样 Uploader 就不会删除成功上传的分段。这对于恢复部分完成的上传非常有用。要对上传的分段进行操作,您必须获取失败上传的 UploadID。以下示例演示了如何使用 manager.MultiUploadFailure 错误接口类型来获取 UploadID。
result, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String("amzn-s3-demo-bucket"), Key: aws.String("my-object-key"), Body: uploadFile, }) output, err := u.upload(input) if err != nil { var mu manager.MultiUploadFailure if errors.As(err, &mu) { // Process error and its associated uploadID fmt.Println("Error:", mu) _ = mu.UploadID() // retrieve the associated UploadID } else { // Process error generically fmt.Println("Error:", err.Error()) } return }
每次上传时覆盖上传器选项
调用 Upload 时,可以通过向该方法提供一个或多个参数来覆盖 Uploader 选项。这些覆盖是并发安全的修改,不会影响正在进行的上传或随后对管理器的 Upload 调用。例如,覆盖特定上传请求的 PartSize 配置:
params := &s3.PutObjectInput{ Bucket: aws.String("amzn-s3-demo-bucket"), Key: aws.String("my-key"), Body: myBody, } resp, err := uploader.Upload(context.TODO(), params, func(u *manager.Uploader) { u.PartSize = 10 * 1024 * 1024, // 10 MiB })
示例
将文件夹上传到 Amazon S3
以下示例使用 path/filepath 程序包以递归方式收集文件列表,并将其上传到指定的 Amazon S3 存储桶。Amazon S3 对象键以文件的相对路径为前缀。
package main import ( "context" "log" "os" "path/filepath" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" ) var ( localPath string bucket string prefix string ) func init() { if len(os.Args) != 4 { log.Fatalln("Usage:", os.Args[0], "<local path> <bucket> <prefix>") } localPath = os.Args[1] bucket = os.Args[2] prefix = os.Args[3] } func main() { walker := make(fileWalk) go func() { // Gather the files to upload by walking the path recursively if err := filepath.Walk(localPath, walker.Walk); err != nil { log.Fatalln("Walk failed:", err) } close(walker) }() cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Fatalln("error:", err) } // For each file found walking, upload it to Amazon S3 uploader := manager.NewUploader(s3.NewFromConfig(cfg)) for path := range walker { rel, err := filepath.Rel(localPath, path) if err != nil { log.Fatalln("Unable to get relative path:", path, err) } file, err := os.Open(path) if err != nil { log.Println("Failed opening file", path, err) continue } defer file.Close() result, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{ Bucket: &bucket, Key: aws.String(filepath.Join(prefix, rel)), Body: file, }) if err != nil { log.Fatalln("Failed to upload", path, err) } log.Println("Uploaded", path, result.Location) } } type fileWalk chan string func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() { f <- path } return nil }
下载管理器
Amazon S3 下载器
示例:下载文件
以下示例使用 Amazon S3 Downloader 下载一个文件。使用Downloader与 s 3 类似。 GetObject
import "context" import "github.com/aws/aws-sdk-go-v2/aws" import "github.com/aws/aws-sdk-go-v2/config" import "github.com/aws/aws-sdk-go-v2/service/s3" import "github.com/aws/aws-sdk-go-v2/feature/s3/manager" // ... cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Println("error:", err) return } client := s3.NewFromConfig(cfg) downloader := manager.NewDownloader(client) numBytes, err := downloader.Download(context.TODO(), downloadFile, &s3.GetObjectInput{ Bucket: aws.String("amzn-s3-demo-bucket"), Key: aws.String("my-key"), })
downloadFile 参数属于 io.WriterAt 类型。WriterAt 接口使 Downloader 能够并行写入文件的多个分段。
配置选项
对 Downloader 实例进行实例化时,您可以指定配置选项以自定义对象的下载方式:
-
PartSize– 指定要下载的每个分段的缓冲区大小(以字节为单位)。每个分段最小大小为 5 MB。 -
Concurrency– 指定要并行下载的分段的数量。
Concurrency 值限制了给定 Download 调用可能发生的分段下载的并发数量。这不是全局客户端并发限制。调整 PartSize 和 Concurrency 配置值以找到最佳配置。例如,具有高带宽连接的系统可以并行接收更大的分段和进行更多的下载。
例如,您的应用程序通过将 Concurrency 设置为 5,对 Downloader 进行配置。应用程序随后从两个不同的 goroutine 中调用 Download,结果将是 10 个并发分段下载(2 个 goroutine * 5 Concurrency)。
警告
您的应用程序应限制对 Download 的并发调用次数,以防止应用程序资源耗尽。
有关Downloader及其其他配置选项的更多信息,请参阅 API 参考中的 Manager.downloader
每次下载时覆盖下载器选项
调用 Download 时,可以通过向该方法提供一个或多个函数参数来覆盖 Downloader 选项。这些覆盖是并发安全的修改,不会影响正在进行的上传或随后对管理器的 Download 调用。例如,覆盖特定上传请求的 PartSize 配置:
params := &s3.GetObjectInput{ Bucket: aws.String("amzn-s3-demo-bucket"), Key: aws.String("my-key"), } resp, err := downloader.Download(context.TODO(), targetWriter, params, func(u *manager.Downloader) { u.PartSize = 10 * 1024 * 1024, // 10 MiB })
示例
下载存储桶中的所有对象
以下示例使用分页功能从 Amazon S3 存储桶收集对象列表。然后将每个对象下载到本地文件。
package main import ( "context" "fmt" "log" "os" "path/filepath" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" ) var ( Bucket = "amzn-s3-demo-bucket" // Download from this bucket Prefix = "logs/" // Using this key prefix LocalDirectory = "s3logs" // Into this directory ) func main() { cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Fatalln("error:", err) } client := s3.NewFromConfig(cfg) manager := manager.NewDownloader(client) paginator := s3.NewListObjectsV2Paginator(client, &s3.ListObjectsV2Input{ Bucket: &Bucket, Prefix: &Prefix, }) for paginator.HasMorePages() { page, err := paginator.NextPage(context.TODO()) if err != nil { log.Fatalln("error:", err) } for _, obj := range page.Contents { if err := downloadToFile(manager, LocalDirectory, Bucket, aws.ToString(obj.Key)); err != nil { log.Fatalln("error:", err) } } } } func downloadToFile(downloader *manager.Downloader, targetDirectory, bucket, key string) error { // Create the directories in the path file := filepath.Join(targetDirectory, key) if err := os.MkdirAll(filepath.Dir(file), 0775); err != nil { return err } // Set up the local file fd, err := os.Create(file) if err != nil { return err } defer fd.Close() // Download the file using the AWS SDK for Go fmt.Printf("Downloading s3://%s/%s to %s...\n", bucket, key, file) _, err = downloader.Download(context.TODO(), fd, &s3.GetObjectInput{Bucket: &bucket, Key: &key}) return err }
GetBucketRegion
GetBucketRegionGetBucketRegion获取 Amazon S3 客户端,并使用它来确定请求的存储桶在与该客户配置的区域关联的 AWS 分区中的位置。
例如,查找存储桶 的区域:amzn-s3-demo-bucket
cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Println("error:", err) return } bucket := "amzn-s3-demo-bucket" region, err := manager.GetBucketRegion(ctx, s3.NewFromConfig(cfg), bucket) if err != nil { var bnf manager.BucketNotFound if errors.As(err, &bnf) { log.Printf("unable to find bucket %s's Region\n", bucket) } else { log.Println("error:", err) } return } fmt.Printf("Bucket %s is in %s region\n", bucket, region)
GetBucketRegion如果无法解析存储桶的位置,则该函数将返回BucketNotFound
不可搜寻的流输入
对于 PutObject 和 UploadPart 之类的 API 操作,默认情况下,Amazon S3 客户端需要 Body 输入参数的值来实现 io.Seekerio.Seeker 接口来确定要上传的值的长度,并计算请求签名的有效载荷哈希值。如果 Body 输入参数值未实现 io.Seeker,您的应用程序将收到错误。
operation error S3: PutObject, failed to compute payload hash: failed to seek body to start, request stream is not seekable
可以通过使用函数选项修改操作方法的 中间件 来更改此行为。W ith APIOptions
resp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ Bucket: &bucketName, Key: &objectName, Body: bytes.NewBuffer([]byte(`example object!`)), ContentLength: 15, // length of body }, s3.WithAPIOptions( v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware, ))
警告
Amazon S3 要求为上传到存储桶的所有对象提供内容长度。由于 Body 输入参数未实现 io.Seeker 接口,因此客户端将无法计算请求的 ContentLength 参数。该参数必须由应用程序提供。如果未提供 ContentLength 参数,请求将失败。
对于不可搜寻且长度未知的上传,请使用 SDK 的 Amazon S3 上传管理器。