本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用将直播上传到 Amazon S3 AWS SDK for Java 2.x
当您使用流通过putObject
uploadPart
RequestBody
工厂类作为同步 API 来提供该流。对于异步 API,AsyncRequestBody
是等效的工厂类。
哪些方法上传直播?
对于同步 API,您可以使用的以下工厂方法RequestBody
来提供流:
-
fromInputStream
(InputStream inputStream, long contentLength) fromContentProvider
(ContentStreamProvider provider, long contentLength, String mimeType) -
ContentStreamProvider
有出fromInputStream(InputStream inputStream)
厂方法
-
-
fromContentProvider
(ContentStreamProvider provider, String mimeType)
对于异步 API,您可以使用以下工厂方法AsyncRequestBody
:
-
fromInputStream
(InputStream inputStream, Long contentLength, ExecutorService executor) -
fromInputStream
(AsyncRequestBodyFromInputStreamConfiguration configuration) -
你可以使用 AsyncRequestBodyFromInputStreamConfiguration .Builder 来提供直播
-
-
fromInputStream
(Consumer<AsyncRequestBodyFromInputStreamConfiguration.Builder> configuration) -
forBlockingInputStream
(Long contentLength) -
结果
BlockingInputStreamAsyncRequestBody
writeInputStream(InputStream inputStream)
包含可用于提供直播的方法
-
正在执行上传
如果你知道直播的长度
从前面显示的方法的签名中可以看出,大多数方法都接受内容长度参数。
如果您知道以字节为单位的内容长度,请提供确切的值:
// Always provide the exact content length when it's available. long contentLength = 1024; // Exact size in bytes. s3Client.putObject(req -> req .bucket("my-bucket") .key("my-key"), RequestBody.fromInputStream(inputStream, contentLength));
警告
当你从输入流上传时,如果你指定的内容长度与实际字节数不匹配,你可能会遇到:
-
如果指定的长度太小,则会截断对象
-
如果指定的长度太大,则上传失败或连接中断
如果你不知道直播的长度
使用同步 API
使用fromContentProvider(ContentStreamProvider provider, String
mimeType)
:
public PutObjectResponse syncClient_stream_unknown_size(String bucketName, String key, InputStream inputStream) { S3Client s3Client = S3Client.create(); RequestBody body = RequestBody.fromContentProvider(ContentStreamProvider.fromInputStream(inputStream), "text/plain"); PutObjectResponse putObjectResponse = s3Client.putObject(b -> b.bucket(BUCKET_NAME).key(KEY_NAME), body); return putObjectResponse; }
由于 SDK 会将整个流缓冲到内存中以计算内容长度,因此大型流可能会遇到内存问题。如果您需要使用同步客户端上传大型直播,请考虑使用分段 API:
public static void uploadStreamToS3(String bucketName, String key, InputStream inputStream) { // Create S3 client S3Client s3Client = S3Client.create(); try { // Step 1: Initiate the multipart upload CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder() .bucket(bucketName) .key(key) .build(); CreateMultipartUploadResponse createResponse = s3Client.createMultipartUpload(createMultipartUploadRequest); String uploadId = createResponse.uploadId(); System.out.println("Started multipart upload with ID: " + uploadId); // Step 2: Upload parts List<CompletedPart> completedParts = new ArrayList<>(); int partNumber = 1; byte[] buffer = new byte[PART_SIZE]; int bytesRead; try { while ((bytesRead = readFullyOrToEnd(inputStream, buffer)) > 0) { // Create request to upload a part UploadPartRequest uploadPartRequest = UploadPartRequest.builder() .bucket(bucketName) .key(key) .uploadId(uploadId) .partNumber(partNumber) .build(); // If we didn't read a full buffer, create a properly sized byte array RequestBody requestBody; if (bytesRead < PART_SIZE) { byte[] lastPartBuffer = new byte[bytesRead]; System.arraycopy(buffer, 0, lastPartBuffer, 0, bytesRead); requestBody = RequestBody.fromBytes(lastPartBuffer); } else { requestBody = RequestBody.fromBytes(buffer); } // Upload the part and save the response's ETag UploadPartResponse uploadPartResponse = s3Client.uploadPart(uploadPartRequest, requestBody); CompletedPart part = CompletedPart.builder() .partNumber(partNumber) .eTag(uploadPartResponse.eTag()) .build(); completedParts.add(part); System.out.println("Uploaded part " + partNumber + " with size " + bytesRead + " bytes"); partNumber++; } // Step 3: Complete the multipart upload CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder() .parts(completedParts) .build(); CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder() .bucket(bucketName) .key(key) .uploadId(uploadId) .multipartUpload(completedMultipartUpload) .build(); CompleteMultipartUploadResponse completeResponse = s3Client.completeMultipartUpload(completeRequest); System.out.println("Multipart upload completed. Object URL: " + completeResponse.location()); } catch (Exception e) { // If an error occurs, abort the multipart upload System.err.println("Error during multipart upload: " + e.getMessage()); AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder() .bucket(bucketName) .key(key) .uploadId(uploadId) .build(); s3Client.abortMultipartUpload(abortRequest); System.err.println("Multipart upload aborted"); } finally { try { inputStream.close(); } catch (IOException e) { System.err.println("Error closing input stream: " + e.getMessage()); } } } finally { s3Client.close(); } } /** * Reads from the input stream into the buffer, attempting to fill the buffer completely * or until the end of the stream is reached. * * @param inputStream the input stream to read from * @param buffer the buffer to fill * @return the number of bytes read, or -1 if the end of the stream is reached before any bytes are read * @throws IOException if an I/O error occurs */ private static int readFullyOrToEnd(InputStream inputStream, byte[] buffer) throws IOException { int totalBytesRead = 0; int bytesRead; while (totalBytesRead < buffer.length) { bytesRead = inputStream.read(buffer, totalBytesRead, buffer.length - totalBytesRead); if (bytesRead == -1) { break; // End of stream } totalBytesRead += bytesRead; } return totalBytesRead > 0 ? totalBytesRead : -1; }
注意
对于大多数用例,我们建议对大小未知的流使用异步客户端 API。这种方法支持并行传输,并提供更简单的编程接口,因为如果流很大,SDK 会将流分成多部分块。
启用了多部分功能的标准 S3 异步客户端和 AWS 基于 CRT 的 S3 客户端都实现了这种方法。我们将在下一节中展示这种方法的示例。
使用异步 API
你可以将contentLength
论点提供null
给 fromInputStream(InputStream inputStream, Long contentLength, ExecutorService
executor)
例 使用基于 C AWS RT 的异步客户端:
public PutObjectResponse crtClient_stream_unknown_size(String bucketName, String key, InputStream inputStream) { S3AsyncClient s3AsyncClient = S3AsyncClient.crtCreate(); ExecutorService executor = Executors.newSingleThreadExecutor(); AsyncRequestBody body = AsyncRequestBody.fromInputStream(inputStream, null, executor); // 'null' indicates that the // content length is unknown. CompletableFuture<PutObjectResponse> responseFuture = s3AsyncClient.putObject(r -> r.bucket(bucketName).key(key), body) .exceptionally(e -> { if (e != null){ logger.error(e.getMessage(), e); } return null; }); PutObjectResponse response = responseFuture.join(); // Wait for the response. executor.shutdown(); return response; }
例 使用启用了多部分功能的标准异步客户端:
public PutObjectResponse asyncClient_multipart_stream_unknown_size(String bucketName, String key, InputStream inputStream) { S3AsyncClient s3AsyncClient = S3AsyncClient.builder().multipartEnabled(true).build(); ExecutorService executor = Executors.newSingleThreadExecutor(); AsyncRequestBody body = AsyncRequestBody.fromInputStream(inputStream, null, executor); // 'null' indicates that the // content length is unknown. CompletableFuture<PutObjectResponse> responseFuture = s3AsyncClient.putObject(r -> r.bucket(bucketName).key(key), body) .exceptionally(e -> { if (e != null) { logger.error(e.getMessage(), e); } return null; }); PutObjectResponse response = responseFuture.join(); // Wait for the response. executor.shutdown(); return response; }