使用AWS SDK for Java 2.x 将流上传到 Amazon S3
当您通过 putObjectuploadPartRequestBody 工厂类作为同步 API 来提供流。对于异步 API,AsyncRequestBody 是等效的工厂类。
哪些方法可上传流?
对于同步 API,您可以使用 RequestBody 的以下工厂方法来提供流:
-
fromInputStream(InputStream inputStream, long contentLength) fromContentProvider(ContentStreamProvider provider, long contentLength, String mimeType) -
ContentStreamProvider有fromInputStream(InputStream inputStream)工厂方法
-
-
fromContentProvider(ContentStreamProvider provider, String mimeType)
对于异步 API,您可以使用 AsyncRequestBody 的以下工厂方法:
-
fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor) -
fromInputStream(AsyncRequestBodyFromInputStreamConfiguration configuration) -
使用 AsyncRequestBodyFromInputStreamConfiguration.Builder 来提供流
-
-
fromInputStream(Consumer<AsyncRequestBodyFromInputStreamConfiguration.Builder> configuration) -
forBlockingInputStream(Long contentLength) -
生成的
BlockingInputStreamAsyncRequestBody包含writeInputStream(InputStream inputStream)方法,您可以使用该方法来提供流
-
执行上传
如果您知道流的长度
从前面所示方法的签名中可以看出,大多数方法都接受内容长度参数。
如果您知道以字节为单位的内容长度,请提供确切值:
// Always provide the exact content length when it's available. long contentLength = 1024; // Exact size in bytes. s3Client.putObject(req -> req .bucket("amzn-s3-demo-bucket") .key("my-key"), RequestBody.fromInputStream(inputStream, contentLength));
警告
当您从输入流上传时,如果指定的内容长度与实际字节数不匹配,则您可能会遇到以下情况:
-
如果指定的长度太小,则将截断对象
-
如果指定的长度太大,则上传失败或连接挂起
如果您不知道流的长度
使用同步 API
使用 fromContentProvider(ContentStreamProvider provider, String
mimeType):
public PutObjectResponse syncClient_stream_unknown_size(String bucketName, String key, InputStream inputStream) { S3Client s3Client = S3Client.create(); RequestBody body = RequestBody.fromContentProvider(ContentStreamProvider.fromInputStream(inputStream), "text/plain"); PutObjectResponse putObjectResponse = s3Client.putObject(b -> b.bucket(BUCKET_NAME).key(KEY_NAME), body); return putObjectResponse; }
由于 SDK 会将整个流缓冲到内存中来计算内容长度,所以在处理大型流时,可能会遇到内存不足的问题。如果您需要使用同步客户端上传大型流,请考虑使用分段 API:
public static void uploadStreamToS3(String bucketName, String key, InputStream inputStream) { // Create S3 client S3Client s3Client = S3Client.create(); try { // Step 1: Initiate the multipart upload CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder() .bucket(bucketName) .key(key) .build(); CreateMultipartUploadResponse createResponse = s3Client.createMultipartUpload(createMultipartUploadRequest); String uploadId = createResponse.uploadId(); System.out.println("Started multipart upload with ID: " + uploadId); // Step 2: Upload parts List<CompletedPart> completedParts = new ArrayList<>(); int partNumber = 1; byte[] buffer = new byte[PART_SIZE]; int bytesRead; try { while ((bytesRead = readFullyOrToEnd(inputStream, buffer)) > 0) { // Create request to upload a part UploadPartRequest uploadPartRequest = UploadPartRequest.builder() .bucket(bucketName) .key(key) .uploadId(uploadId) .partNumber(partNumber) .build(); // If we didn't read a full buffer, create a properly sized byte array RequestBody requestBody; if (bytesRead < PART_SIZE) { byte[] lastPartBuffer = new byte[bytesRead]; System.arraycopy(buffer, 0, lastPartBuffer, 0, bytesRead); requestBody = RequestBody.fromBytes(lastPartBuffer); } else { requestBody = RequestBody.fromBytes(buffer); } // Upload the part and save the response's ETag UploadPartResponse uploadPartResponse = s3Client.uploadPart(uploadPartRequest, requestBody); CompletedPart part = CompletedPart.builder() .partNumber(partNumber) .eTag(uploadPartResponse.eTag()) .build(); completedParts.add(part); System.out.println("Uploaded part " + partNumber + " with size " + bytesRead + " bytes"); partNumber++; } // Step 3: Complete the multipart upload CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder() .parts(completedParts) .build(); CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder() .bucket(bucketName) .key(key) .uploadId(uploadId) .multipartUpload(completedMultipartUpload) .build(); CompleteMultipartUploadResponse completeResponse = s3Client.completeMultipartUpload(completeRequest); System.out.println("Multipart upload completed. Object URL: " + completeResponse.location()); } catch (Exception e) { // If an error occurs, abort the multipart upload System.err.println("Error during multipart upload: " + e.getMessage()); AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder() .bucket(bucketName) .key(key) .uploadId(uploadId) .build(); s3Client.abortMultipartUpload(abortRequest); System.err.println("Multipart upload aborted"); } finally { try { inputStream.close(); } catch (IOException e) { System.err.println("Error closing input stream: " + e.getMessage()); } } } finally { s3Client.close(); } } /** * Reads from the input stream into the buffer, attempting to fill the buffer completely * or until the end of the stream is reached. * * @param inputStream the input stream to read from * @param buffer the buffer to fill * @return the number of bytes read, or -1 if the end of the stream is reached before any bytes are read * @throws IOException if an I/O error occurs */ private static int readFullyOrToEnd(InputStream inputStream, byte[] buffer) throws IOException { int totalBytesRead = 0; int bytesRead; while (totalBytesRead < buffer.length) { bytesRead = inputStream.read(buffer, totalBytesRead, buffer.length - totalBytesRead); if (bytesRead == -1) { break; // End of stream } totalBytesRead += bytesRead; } return totalBytesRead > 0 ? totalBytesRead : -1; }
注意
对于大多数使用案例,建议为未知大小的流使用异步客户端 API。这种方法支持并行传输,并提供更简单的编程接口,因为如果流很大,SDK 在处理流时会将其分割成多个分段。
启用了分段功能的标准 S3 异步客户端和基于 AWS CRT 的 S3 客户端都实现了这种方法。我们将在下一节介绍此方法的示例。
使用异步 API
您可以将 contentLength 参数 null 提供给 fromInputStream(InputStream inputStream, Long contentLength, ExecutorService
executor)
例 使用基于 AWS CRT 的异步客户端:
public PutObjectResponse crtClient_stream_unknown_size(String bucketName, String key, InputStream inputStream) { S3AsyncClient s3AsyncClient = S3AsyncClient.crtCreate(); ExecutorService executor = Executors.newSingleThreadExecutor(); AsyncRequestBody body = AsyncRequestBody.fromInputStream(inputStream, null, executor); // 'null' indicates that the // content length is unknown. CompletableFuture<PutObjectResponse> responseFuture = s3AsyncClient.putObject(r -> r.bucket(bucketName).key(key), body) .exceptionally(e -> { if (e != null){ logger.error(e.getMessage(), e); } return null; }); PutObjectResponse response = responseFuture.join(); // Wait for the response. executor.shutdown(); return response; }
例 使用启用了分段的标准异步客户端:
public PutObjectResponse asyncClient_multipart_stream_unknown_size(String bucketName, String key, InputStream inputStream) { S3AsyncClient s3AsyncClient = S3AsyncClient.builder().multipartEnabled(true).build(); ExecutorService executor = Executors.newSingleThreadExecutor(); AsyncRequestBody body = AsyncRequestBody.fromInputStream(inputStream, null, executor); // 'null' indicates that the // content length is unknown. CompletableFuture<PutObjectResponse> responseFuture = s3AsyncClient.putObject(r -> r.bucket(bucketName).key(key), body) .exceptionally(e -> { if (e != null) { logger.error(e.getMessage(), e); } return null; }); PutObjectResponse response = responseFuture.join(); // Wait for the response. executor.shutdown(); return response; }